-
Notifications
You must be signed in to change notification settings - Fork 556
Description
Quick reproduction
val v = Vector.fill(400_000)(0)
v.parTraverseN(10) { i =>
IO.sleep(10.seconds) >> IO.raiseError(new RuntimeException)
}.void
This will most likely cause 100% CPU consumption after the initial wait 10 seconds and soon after that starvation warnings.
Explanation
parTraverseN
is defined as follows:
MiniSemaphore[F](n).flatMap { sem => ta.parTraverse { a => sem.withPermit(f(a)) } }
Meaning it creates a MiniSemaphore and then starts a proper parTraverse. parTraverse
launches a fiber for each element of the 400k Vector (which is ok) with sem.withPermit(f(a))
.
Inside the implementation of MiniSemaphore
and withPermit
we see that: 1) it has a Ref
-based state with a Queue
inside; 2) it has a cleanup routine that is executed when the fiber is cancelled. The implementation of Queue
is two linked lists. Unpleasant, but not the worst.
The implementation of Ref
(class SyncRef
) implements modify like this:
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
This is a CAS-loop: it gets the current value, calculate the next value using the provided function, sets the variable to the calcualted value if the variable hasn’t been changed since then. If the variable has been changed, the compareAndSet
fails and the loop continues until morale improves.
cleanup
is called when the fiber is cancelled. Coincidentally, whenever one of fibers fails with an error, the rest of them get cancelled by parTraverse
.
After that we have 399999 fibers trying to call cleanup on a single ref using all available CPUs. The cleanup function is trying to filter out a single element out of the queue, given linked lists, this could be quite bad.
Potential fix
In my opinion, the best fix would be to somehow limit the number of fiber that exist concurrently to N. This means we have to forgo the "fairness" property that is stated in the documentation, but not provided in practice as discussed in #4262. Limiting the number of fibers would also be good on the memory consumption, since they are not free, and in our particular case 400k fibers easily take additional ~500 MiB of RAM.
If you'd prefer not to do that, replacing Queue
in MiniSemaphore
with a different data structure would work. I have reports that Set
works better, maybe Vector
will, too. This doesn't solve the contention problem, of course, only makes it not so hard on the CPUs.