Skip to content

Commit c01ca11

Browse files
committed
Avoid use of asyncio.get_event_loop (3.14)
Python 3.14 will remove "asyncio.get_event_loop" https://docs.python.org/dev/whatsnew/3.14.html#id6 Replace "ayscio.get_event_loop
1 parent 28dd52c commit c01ca11

File tree

10 files changed

+45
-38
lines changed

10 files changed

+45
-38
lines changed

reactivex/observable/observable.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,11 @@ def __await__(self) -> Generator[Any, None, _T_out]:
259259
"""
260260
from ..operators._tofuture import to_future_
261261

262-
loop = asyncio.get_event_loop()
262+
try:
263+
loop = asyncio.get_running_loop()
264+
except RuntimeError:
265+
loop = asyncio.new_event_loop()
266+
263267
future: asyncio.Future[_T_out] = self.pipe(
264268
to_future_(scheduler=AsyncIOScheduler(loop=loop))
265269
)

reactivex/operators/_tofuture.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ def to_future_(
1212
future_ctor: Optional[Callable[[], "Future[_T]"]] = None,
1313
scheduler: Optional[abc.SchedulerBase] = None,
1414
) -> Callable[[Observable[_T]], "Future[_T]"]:
15-
future_ctor_: Callable[[], "Future[_T]"] = (
16-
future_ctor or asyncio.get_event_loop().create_future
17-
)
18-
future: "Future[_T]" = future_ctor_()
1915

2016
def to_future(source: Observable[_T]) -> "Future[_T]":
2117
"""Converts an existing observable sequence to a Future.
@@ -33,9 +29,18 @@ def to_future(source: Observable[_T]) -> "Future[_T]":
3329
Returns:
3430
A future with the last value from the observable sequence.
3531
"""
32+
if future_ctor is not None:
33+
future_ctor_ = future_ctor
34+
else:
35+
try:
36+
future_ctor_ = asyncio.get_running_loop().create_future # Python 3.14+
37+
except RuntimeError:
38+
future_ctor_ = lambda: asyncio.Future() # If no running loop
39+
40+
future = future_ctor_()
3641

3742
has_value = False
38-
last_value = cast(_T, None)
43+
last_value: Optional[_T] = None
3944

4045
def on_next(value: _T):
4146
nonlocal last_value

reactivex/scheduler/eventloop/asyncioscheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
2626
2727
Args:
2828
loop: Instance of asyncio event loop to use; typically, you would
29-
get this by asyncio.get_event_loop()
29+
get this by asyncio.get_running_loop()
3030
"""
3131
super().__init__()
3232
self._loop: asyncio.AbstractEventLoop = loop

reactivex/scheduler/eventloop/asynciothreadsafescheduler.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,11 @@ def _on_self_loop_or_not_running(self) -> bool:
145145
"""
146146
if not self._loop.is_running():
147147
return True
148-
current_loop = None
148+
149149
try:
150-
# In python 3.7 there asyncio.get_running_loop() is prefered.
151-
current_loop = asyncio.get_event_loop()
150+
current_loop = asyncio.get_running_loop()
152151
except RuntimeError:
153-
# If there is no loop in current thread at all, and it is not main
154-
# thread, we get error like:
155-
# RuntimeError: There is no current event loop in thread 'Thread-1'
156-
pass
152+
# If no running event loop is found, assume we're in a different thread
153+
return True
154+
157155
return self._loop == current_loop

tests/test_observable/test_flatmap_async.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
class TestFlatMapAsync(unittest.TestCase):
1010
def test_flat_map_async(self):
1111
actual_next = None
12-
loop = asyncio.get_event_loop()
12+
loop = asyncio.new_event_loop()
1313
scheduler = AsyncIOScheduler(loop=loop)
1414

1515
def mapper(i: int):

tests/test_observable/test_fromfuture.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class TestFromFuture(unittest.TestCase):
99
def test_future_success(self):
10-
loop = asyncio.get_event_loop()
10+
loop = asyncio.new_event_loop()
1111
success = [False, True, False]
1212

1313
async def go():
@@ -31,7 +31,7 @@ def on_completed():
3131
assert all(success)
3232

3333
def test_future_failure(self):
34-
loop = asyncio.get_event_loop()
34+
loop = asyncio.new_event_loop()
3535
success = [True, False, True]
3636

3737
async def go():
@@ -57,7 +57,7 @@ def on_completed():
5757
assert all(success)
5858

5959
def test_future_cancel(self):
60-
loop = asyncio.get_event_loop()
60+
loop = asyncio.new_event_loop()
6161
success = [True, False, True]
6262

6363
async def go():
@@ -80,7 +80,7 @@ def on_completed():
8080
assert all(success)
8181

8282
def test_future_dispose(self):
83-
loop = asyncio.get_event_loop()
83+
loop = asyncio.new_event_loop()
8484
success = [True, True, True]
8585

8686
async def go():

tests/test_observable/test_start.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
class TestStart(unittest.TestCase):
1818
def test_start_async(self):
19-
loop = asyncio.get_event_loop()
19+
loop = asyncio.new_event_loop()
2020
success = [False]
2121

2222
async def go():
@@ -36,7 +36,7 @@ def on_next(x):
3636
assert all(success)
3737

3838
def test_start_async_error(self):
39-
loop = asyncio.get_event_loop()
39+
loop = asyncio.new_event_loop()
4040
success = [False]
4141

4242
async def go():

tests/test_observable/test_tofuture.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
class TestToFuture(unittest.TestCase):
2020
def test_await_success(self):
21-
loop = asyncio.get_event_loop()
21+
loop = asyncio.new_event_loop()
2222
result = None
2323

2424
async def go():
@@ -30,7 +30,7 @@ async def go():
3030
assert result == 42
3131

3232
def test_await_success_on_sequence(self):
33-
loop = asyncio.get_event_loop()
33+
loop = asyncio.new_event_loop()
3434
result = None
3535

3636
async def go():
@@ -42,7 +42,7 @@ async def go():
4242
assert result == 42
4343

4444
def test_await_error(self):
45-
loop = asyncio.get_event_loop()
45+
loop = asyncio.new_event_loop()
4646
error = Exception("error")
4747
result = None
4848

@@ -58,7 +58,7 @@ async def go():
5858
assert result == error
5959

6060
def test_await_empty_observable(self):
61-
loop = asyncio.get_event_loop()
61+
loop = asyncio.new_event_loop()
6262
result = None
6363

6464
async def go():
@@ -71,7 +71,7 @@ async def go():
7171
)
7272

7373
def test_await_with_delay(self):
74-
loop = asyncio.get_event_loop()
74+
loop = asyncio.new_event_loop()
7575
result = None
7676

7777
async def go():
@@ -83,7 +83,7 @@ async def go():
8383
assert result == 42
8484

8585
def test_cancel(self):
86-
loop = asyncio.get_event_loop()
86+
loop = asyncio.new_event_loop()
8787

8888
async def go():
8989
source = reactivex.return_value(42)
@@ -96,7 +96,7 @@ async def go():
9696
self.assertRaises(asyncio.CancelledError, loop.run_until_complete, go())
9797

9898
def test_dispose_on_cancel(self):
99-
loop = asyncio.get_event_loop()
99+
loop = asyncio.new_event_loop()
100100
sub = Subject()
101101

102102
async def using_sub():

tests/test_scheduler/test_eventloop/test_asyncioscheduler.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,22 @@
1313
class TestAsyncIOScheduler(unittest.TestCase):
1414
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
1515
def test_asyncio_schedule_now(self):
16-
loop = asyncio.get_event_loop()
16+
loop = asyncio.new_event_loop()
1717
scheduler = AsyncIOScheduler(loop)
1818
diff = scheduler.now - datetime.fromtimestamp(loop.time(), tz=timezone.utc)
1919
assert abs(diff) < timedelta(milliseconds=2) # NOTE: may take 1 ms in CI
2020

2121
@pytest.mark.skipif(CI, reason="Test is flaky in GitHub Actions")
2222
def test_asyncio_schedule_now_units(self):
23-
loop = asyncio.get_event_loop()
23+
loop = asyncio.new_event_loop()
2424
scheduler = AsyncIOScheduler(loop)
2525
diff = scheduler.now
2626
yield from asyncio.sleep(0.1)
2727
diff = scheduler.now - diff
2828
assert timedelta(milliseconds=80) < diff < timedelta(milliseconds=180)
2929

3030
def test_asyncio_schedule_action(self):
31-
loop = asyncio.get_event_loop()
31+
loop = asyncio.new_event_loop()
3232

3333
async def go():
3434
scheduler = AsyncIOScheduler(loop)
@@ -46,7 +46,7 @@ def action(scheduler, state):
4646
loop.run_until_complete(go())
4747

4848
def test_asyncio_schedule_action_due(self):
49-
loop = asyncio.get_event_loop()
49+
loop = asyncio.new_event_loop()
5050

5151
async def go():
5252
scheduler = AsyncIOScheduler(loop)
@@ -67,7 +67,7 @@ def action(scheduler, state):
6767
loop.run_until_complete(go())
6868

6969
def test_asyncio_schedule_action_cancel(self):
70-
loop = asyncio.get_event_loop()
70+
loop = asyncio.new_event_loop()
7171

7272
async def go():
7373
ran = False

tests/test_scheduler/test_eventloop/test_asynciothreadsafescheduler.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,22 @@
1414
class TestAsyncIOThreadSafeScheduler(unittest.TestCase):
1515
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
1616
def test_asyncio_threadsafe_schedule_now(self):
17-
loop = asyncio.get_event_loop()
17+
loop = asyncio.new_event_loop()
1818
scheduler = AsyncIOThreadSafeScheduler(loop)
1919
diff = scheduler.now - datetime.fromtimestamp(loop.time(), tz=timezone.utc)
2020
assert abs(diff) < timedelta(milliseconds=2)
2121

2222
@pytest.mark.skipif(CI, reason="Flaky test in GitHub Actions")
2323
def test_asyncio_threadsafe_schedule_now_units(self):
24-
loop = asyncio.get_event_loop()
24+
loop = asyncio.new_event_loop()
2525
scheduler = AsyncIOThreadSafeScheduler(loop)
2626
diff = scheduler.now
2727
yield from asyncio.sleep(0.1)
2828
diff = scheduler.now - diff
2929
assert timedelta(milliseconds=80) < diff < timedelta(milliseconds=180)
3030

3131
def test_asyncio_threadsafe_schedule_action(self):
32-
loop = asyncio.get_event_loop()
32+
loop = asyncio.new_event_loop()
3333

3434
async def go():
3535
scheduler = AsyncIOThreadSafeScheduler(loop)
@@ -50,7 +50,7 @@ def schedule():
5050
loop.run_until_complete(go())
5151

5252
def test_asyncio_threadsafe_schedule_action_due(self):
53-
loop = asyncio.get_event_loop()
53+
loop = asyncio.new_event_loop()
5454

5555
async def go():
5656
scheduler = AsyncIOThreadSafeScheduler(loop)
@@ -74,7 +74,7 @@ def schedule():
7474
loop.run_until_complete(go())
7575

7676
def test_asyncio_threadsafe_schedule_action_cancel(self):
77-
loop = asyncio.get_event_loop()
77+
loop = asyncio.new_event_loop()
7878

7979
async def go():
8080
ran = False

0 commit comments

Comments
 (0)