Skip to content

Commit c476410

Browse files
authored
gh-110206: Fix multiprocessing test_notify_all (#130933)
The test could deadlock trying join on the worker processes due to a combination of behaviors: * The use of `assertReachesEventually` did not ensure that workers actually woken.release() because the SyncManager's Semaphore does not implement get_value. * This mean that the test could finish and the variable "sleeping" would got out of scope and be collected. This unregisters the proxy leading to failures in the worker or possibly the manager. * The subsequent call to `p.join()` during cleanUp therefore never finished. This takes two approaches to fix this: 1) Use woken.acquire() to ensure that the workers actually finish calling woken.release() 2) At the end of the test, wait until the workers are finished, while `cond`, `sleeping`, and `woken` are still valid.
1 parent 89df62c commit c476410

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

Lib/test/_test_multiprocessing.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -1694,18 +1694,19 @@ def test_notify_all(self):
16941694
woken = self.Semaphore(0)
16951695

16961696
# start some threads/processes which will timeout
1697+
workers = []
16971698
for i in range(3):
16981699
p = self.Process(target=self.f,
16991700
args=(cond, sleeping, woken, TIMEOUT1))
17001701
p.daemon = True
17011702
p.start()
1702-
self.addCleanup(p.join)
1703+
workers.append(p)
17031704

17041705
t = threading.Thread(target=self.f,
17051706
args=(cond, sleeping, woken, TIMEOUT1))
17061707
t.daemon = True
17071708
t.start()
1708-
self.addCleanup(t.join)
1709+
workers.append(t)
17091710

17101711
# wait for them all to sleep
17111712
for i in range(6):
@@ -1724,12 +1725,12 @@ def test_notify_all(self):
17241725
p = self.Process(target=self.f, args=(cond, sleeping, woken))
17251726
p.daemon = True
17261727
p.start()
1727-
self.addCleanup(p.join)
1728+
workers.append(p)
17281729

17291730
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
17301731
t.daemon = True
17311732
t.start()
1732-
self.addCleanup(t.join)
1733+
workers.append(t)
17331734

17341735
# wait for them to all sleep
17351736
for i in range(6):
@@ -1745,11 +1746,17 @@ def test_notify_all(self):
17451746
cond.release()
17461747

17471748
# check they have all woken
1748-
self.assertReachesEventually(lambda: get_value(woken), 6)
1749+
for i in range(6):
1750+
woken.acquire()
1751+
self.assertReturnsIfImplemented(0, get_value, woken)
17491752

17501753
# check state is not mucked up
17511754
self.check_invariant(cond)
17521755

1756+
for w in workers:
1757+
# NOTE: join_process and join_thread are the same
1758+
threading_helper.join_thread(w)
1759+
17531760
def test_notify_n(self):
17541761
cond = self.Condition()
17551762
sleeping = self.Semaphore(0)

0 commit comments

Comments
 (0)