-
Notifications
You must be signed in to change notification settings - Fork 249
Description
UniversalQueue
has a race condition in the _put_notify()
and _put()
methods. Since a future object is cancelled without holding the lock, a future.cancel()
call[1][2][3][4] can occur after future.cancelled()
but before future.set_result()
. As a result, concurrent.futures.InvalidStateError
will be raised.
Since this is a multi-threaded issue, it cannot be reliably reproduced. Instead, below is synthetic code that demonstrates that behavior.
from concurrent.futures import Future
fut = Future()
if not fut.cancelled():
fut.cancel() # await _future_wait(fut) | await asyncio.wrap_future(fut)
fut.set_result(42) # raises concurrent.futures.InvalidStateError
The quickest solution is to suppress concurrent.futures.InvalidStateError
, but this requires returning the item back to the queue in the _put()
method. A more correct solution is to use future.set_running_or_notify_cancel()
.
Note that UniversalEvent
has the same problem in the _unblock_waiters()
method[1][2][3].