diff --git a/aio_pika/abc.py b/aio_pika/abc.py index 47fb8967..21303ca3 100644 --- a/aio_pika/abc.py +++ b/aio_pika/abc.py @@ -81,11 +81,6 @@ class DeclarationResult: class AbstractTransaction: state: TransactionState - @property - @abstractmethod - def channel(self) -> "AbstractChannel": - raise NotImplementedError - @abstractmethod async def select( self, timeout: TimeoutType = None, @@ -514,9 +509,8 @@ def is_closed(self) -> bool: def close(self, exc: Optional[ExceptionType] = None) -> Awaitable[None]: raise NotImplementedError - @property @abstractmethod - def channel(self) -> aiormq.abc.AbstractChannel: + async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel: raise NotImplementedError @property diff --git a/aio_pika/channel.py b/aio_pika/channel.py index 77bcd3c6..7c024ff0 100644 --- a/aio_pika/channel.py +++ b/aio_pika/channel.py @@ -1,4 +1,5 @@ import asyncio +import warnings from abc import ABC from types import TracebackType from typing import Any, AsyncContextManager, Generator, Optional, Type, Union @@ -78,7 +79,7 @@ def __init__( # That's means user closed channel instance explicitly self._closed: bool = False - self._channel = None + self._channel: Optional[UnderlayChannel] = None self._channel_number = channel_number self.close_callbacks = CallbackCollection(self) @@ -119,8 +120,7 @@ async def close( self._closed = True await self._channel.close() - @property - def channel(self) -> aiormq.abc.AbstractChannel: + async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel: if not self.is_initialized or not self._channel: raise aiormq.exceptions.ChannelInvalidStateError( @@ -134,13 +134,20 @@ def channel(self) -> aiormq.abc.AbstractChannel: return self._channel.channel + @property + def channel(self) -> Optional[aiormq.abc.AbstractChannel]: + warnings.warn("This property is deprecated, do not use this anymore.") + if self._channel is None: + raise aiormq.exceptions.ChannelInvalidStateError + return self._channel.channel + @property def number(self) -> Optional[int]: - return ( - self.channel.number - if self.is_initialized - else self._channel_number - ) + if self._channel is None: + return self._channel_number + + underlay_channel: UnderlayChannel = self._channel + return underlay_channel.channel.number def __str__(self) -> str: return "{}".format(self.number or "Not initialized channel") @@ -192,7 +199,8 @@ async def _on_close(self, closing: asyncio.Future) -> None: self._channel.channel.on_return_callbacks.discard(self._on_return) async def _on_initialized(self) -> None: - self.channel.on_return_callbacks.add(self._on_return) + channel = await self.get_underlay_channel() + channel.on_return_callbacks.add(self._on_return) def _on_return(self, message: aiormq.abc.DeliveredMessage) -> None: self.return_callbacks(IncomingMessage(message, no_ack=True)) @@ -240,8 +248,10 @@ async def declare_exchange( if auto_delete and durable is None: durable = False + channel = await self.get_underlay_channel() + exchange = self.EXCHANGE_CLASS( - channel=self.channel, + channel=channel, name=name, type=type, durable=durable, @@ -281,7 +291,7 @@ async def get_exchange( return await self.declare_exchange(name=name, passive=True) else: return self.EXCHANGE_CLASS( - channel=self.channel, + channel=await self.get_underlay_channel(), name=name, durable=False, auto_delete=False, @@ -321,7 +331,7 @@ async def declare_queue( """ queue: AbstractQueue = self.QUEUE_CLASS( - channel=self.channel, + channel=await self.get_underlay_channel(), name=name, durable=durable, exclusive=exclusive, @@ -358,7 +368,7 @@ async def get_queue( return await self.declare_queue(name=name, passive=True) else: return self.QUEUE_CLASS( - channel=self.channel, + channel=await self.get_underlay_channel(), name=name, durable=False, exclusive=False, @@ -379,7 +389,9 @@ async def set_qos( warn('Use "global_" instead of "all_channels"', DeprecationWarning) global_ = all_channels - return await self.channel.basic_qos( + channel = await self.get_underlay_channel() + + return await channel.basic_qos( prefetch_count=prefetch_count, prefetch_size=prefetch_size, global_=global_, @@ -394,7 +406,8 @@ async def queue_delete( if_empty: bool = False, nowait: bool = False, ) -> aiormq.spec.Queue.DeleteOk: - return await self.channel.queue_delete( + channel = await self.get_underlay_channel() + return await channel.queue_delete( queue=queue_name, if_unused=if_unused, if_empty=if_empty, @@ -409,7 +422,8 @@ async def exchange_delete( if_unused: bool = False, nowait: bool = False, ) -> aiormq.spec.Exchange.DeleteOk: - return await self.channel.exchange_delete( + channel = await self.get_underlay_channel() + return await channel.exchange_delete( exchange=exchange_name, if_unused=if_unused, nowait=nowait, @@ -426,7 +440,8 @@ def transaction(self) -> Transaction: return Transaction(self) async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk: - return await self.channel.flow(active=active) + channel = await self.get_underlay_channel() + return await channel.flow(active=active) __all__ = ("Channel",) diff --git a/aio_pika/message.py b/aio_pika/message.py index 6e4807a9..b670db3f 100644 --- a/aio_pika/message.py +++ b/aio_pika/message.py @@ -482,7 +482,7 @@ async def ack(self, multiple: bool = False) -> None: async def reject(self, requeue: bool = False) -> None: """ When `requeue=True` the message will be returned to queue. - Otherwise message will be dropped. + Otherwise, message will be dropped. .. note:: This method looks like a blocking-method, but actually it just diff --git a/aio_pika/robust_channel.py b/aio_pika/robust_channel.py index a357143e..de388938 100644 --- a/aio_pika/robust_channel.py +++ b/aio_pika/robust_channel.py @@ -1,8 +1,9 @@ import asyncio from collections import defaultdict from itertools import chain -from typing import Any, DefaultDict, Dict, Optional, Set, Type, Union +from typing import Any, DefaultDict, Dict, MutableSet, Optional, Type, Union from warnings import warn +from weakref import WeakSet import aiormq @@ -28,8 +29,8 @@ class RobustChannel(Channel, AbstractRobustChannel): # type: ignore QUEUE_CLASS: Type[Queue] = RobustQueue EXCHANGE_CLASS: Type[Exchange] = RobustExchange - _exchanges: DefaultDict[str, Set[AbstractRobustExchange]] - _queues: DefaultDict[str, Set[RobustQueue]] + _exchanges: DefaultDict[str, MutableSet[AbstractRobustExchange]] + _queues: DefaultDict[str, MutableSet[RobustQueue]] default_exchange: RobustExchange def __init__( @@ -57,8 +58,8 @@ def __init__( on_return_raises=on_return_raises, ) - self._exchanges = defaultdict(set) - self._queues = defaultdict(set) + self._exchanges = defaultdict(WeakSet) + self._queues = defaultdict(WeakSet) self._prefetch_count: int = 0 self._prefetch_size: int = 0 self._global_qos: bool = False @@ -73,6 +74,10 @@ async def __close_callback(self, *_: Any) -> None: await self.reopen() + async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel: + await self._connection.ready() + return await super().get_underlay_channel() + async def restore(self, connection: aiormq.abc.AbstractConnection) -> None: async with self.__restore_lock: self._connection = connection diff --git a/aio_pika/transaction.py b/aio_pika/transaction.py index a324275b..661a8ce1 100644 --- a/aio_pika/transaction.py +++ b/aio_pika/transaction.py @@ -33,7 +33,8 @@ def channel(self) -> AbstractChannel: async def select( self, timeout: TimeoutType = None, ) -> aiormq.spec.Tx.SelectOk: - result = await self.channel.channel.tx_select(timeout=timeout) + channel = await self.channel.get_underlay_channel() + result = await channel.tx_select(timeout=timeout) self.state = TransactionState.STARTED return result @@ -41,14 +42,16 @@ async def select( async def rollback( self, timeout: TimeoutType = None, ) -> commands.Tx.RollbackOk: - result = await self.channel.channel.tx_rollback(timeout=timeout) + channel = await self.channel.get_underlay_channel() + result = await channel.tx_rollback(timeout=timeout) self.state = TransactionState.ROLLED_BACK return result async def commit( self, timeout: TimeoutType = None, ) -> commands.Tx.CommitOk: - result = await self.channel.channel.tx_commit(timeout=timeout) + channel = await self.channel.get_underlay_channel() + result = await channel.tx_commit(timeout=timeout) self.state = TransactionState.COMMITED return result diff --git a/poetry.lock b/poetry.lock index cd66f8cd..ad32d844 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,30 +1,47 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand. [[package]] name = "aiomisc" -version = "16.3.15" +version = "17.0.6" description = "aiomisc - miscellaneous utils for asyncio" category = "dev" optional = false python-versions = ">=3.7,<4.0" files = [ - {file = "aiomisc-16.3.15-py3-none-any.whl", hash = "sha256:427dda9fc2e8d67d2e54331e18b86f257ad2412ef745157a87f1fd898cf7aa1e"}, - {file = "aiomisc-16.3.15.tar.gz", hash = "sha256:9470e3b0af1db357eda2f6362006c9c24de39e7e05a03a3a7ff36efcc48086a3"}, + {file = "aiomisc-17.0.6-py3-none-any.whl", hash = "sha256:a224f612f86bcd9cc125ebc10088d81d189b71117e6988a8fecca5f475c0cde0"}, + {file = "aiomisc-17.0.6.tar.gz", hash = "sha256:aa207a751263467bea9b06d9532ee857d53a9e8ac233ba594b146d907094dc98"}, ] [package.dependencies] colorlog = ">=6.0,<7.0" logging-journald = {version = "*", markers = "sys_platform == \"linux\""} +typing_extensions = {version = "*", markers = "python_version < \"3.8\""} [package.extras] -aiohttp = ["aiohttp"] +aiohttp = ["aiohttp (>3)"] asgi = ["aiohttp-asgi (>=0.4.3,<0.5.0)"] carbon = ["aiocarbon (>=0.15,<0.16)"] cron = ["croniter (>=1.3.8,<2.0.0)"] -raven = ["aiohttp", "raven"] +raven = ["aiohttp (>3)", "raven"] rich = ["rich"] uvloop = ["uvloop (>=0.14,<1)"] +[[package]] +name = "aiomisc-pytest" +version = "1.1.1" +description = "pytest integration for aiomisc" +category = "dev" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "aiomisc_pytest-1.1.1-py3-none-any.whl", hash = "sha256:c07251f79c936c85c7589429f43c728cf1a34b80c0089b268f2cfa6186e77020"}, + {file = "aiomisc_pytest-1.1.1.tar.gz", hash = "sha256:2c378c41b078c0576027de6bf7fbc537a7e69285d23eaf4d45738d5d0de56dd3"}, +] + +[package.dependencies] +aiomisc = ">=17" +pytest = ">=7.2.1,<8.0.0" + [[package]] name = "aiormq" version = "6.7.4" @@ -1518,4 +1535,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "656f9645c186c7666256e450039342c3b84d4f53f283ae5025cc1c16631b0483" +content-hash = "933c4ac34f8ad5466b3aefe2b32ca123f10fc5c1dc42414dccf96afd3915b776" diff --git a/pyproject.toml b/pyproject.toml index 1ea545c9..060c5c93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,6 @@ typing_extensions = [{ version = '*', python = "< 3.8" }] setuptools = [{ version = '*', python = "< 3.8" }] [tool.poetry.group.dev.dependencies] -aiomisc = "^16.2" collective-checkdocs = "^0.2" coverage = "^6.5.0" coveralls = "^3.3.1" @@ -59,6 +58,7 @@ sphinx = "^5.3.0" sphinx-autobuild = "^2021.3.14" timeout-decorator = "^0.5.0" types-setuptools = "^65.6.0.2" +aiomisc-pytest = "^1.1.1" [tool.poetry.group.uvloop.dependencies] uvloop = "^0.17.0" diff --git a/tests/conftest.py b/tests/conftest.py index efff05d9..0ea1dcfb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ @pytest.fixture -async def add_cleanup(loop): +async def add_cleanup(event_loop): entities = [] def payload(func, *args, **kwargs): @@ -34,12 +34,12 @@ def payload(func, *args, **kwargs): @pytest.fixture -async def create_task(loop): +async def create_task(event_loop): tasks = [] def payload(coroutine): nonlocal tasks - task = loop.create_task(coroutine) + task = event_loop.create_task(coroutine) tasks.append(task) return task @@ -91,8 +91,8 @@ def connection_fabric(request): @pytest.fixture -def create_connection(connection_fabric, loop, amqp_url): - return partial(connection_fabric, amqp_url, loop=loop) +def create_connection(connection_fabric, event_loop, amqp_url): + return partial(connection_fabric, amqp_url, loop=event_loop) @pytest.fixture diff --git a/tests/test_amqp.py b/tests/test_amqp.py index f82a3084..cf66f43f 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -48,7 +48,9 @@ def declare_exchange_(declare_exchange): class TestCaseAmqp(TestCaseAmqpBase): - async def test_properties(self, loop, connection: aio_pika.Connection): + async def test_properties( + self, event_loop, connection: aio_pika.Connection + ): assert not connection.is_closed async def test_channel_close(self, connection: aio_pika.Connection): @@ -704,7 +706,7 @@ async def test_reject_twice( async def test_consuming( self, - loop, + event_loop, channel: aio_pika.Channel, declare_exchange: Callable, declare_queue: Callable, @@ -721,7 +723,7 @@ async def test_consuming( body = bytes(shortuuid.uuid(), "utf-8") - f = loop.create_future() + f = event_loop.create_future() async def handle(message): await message.ack() @@ -741,7 +743,7 @@ async def handle(message): async def test_consuming_not_coroutine( self, - loop, + event_loop, channel: aio_pika.Channel, declare_exchange: Callable, declare_queue: Callable, @@ -759,7 +761,7 @@ async def test_consuming_not_coroutine( body = bytes(shortuuid.uuid(), "utf-8") - f = loop.create_future() + f = event_loop.create_future() async def handle(message): await message.ack() @@ -883,7 +885,7 @@ async def test_exchange_delete( async def test_dlx( self, - loop, + event_loop, channel: aio_pika.Channel, declare_exchange: Callable, declare_queue: Callable, @@ -893,7 +895,7 @@ async def test_dlx( routing_key = "%s_routing_key" % suffix dlx_routing_key = "%s_dlx_routing_key" % suffix - f = loop.create_future() + f = event_loop.create_future() async def dlx_handle(message): await message.ack() @@ -948,7 +950,8 @@ async def dlx_handle(message): await f async def test_expiration( - self, channel: aio_pika.Channel, loop, declare_exchange, declare_queue, + self, channel: aio_pika.Channel, event_loop, + declare_exchange, declare_queue, ): dlx_queue = await declare_queue( @@ -982,7 +985,7 @@ async def test_expiration( queue.name, ) - f = loop.create_future() + f = event_loop.create_future() await dlx_queue.consume(f.set_result, no_ack=True) @@ -1207,7 +1210,9 @@ async def test_transaction_simple_async_rollback( async with channel.transaction(): raise ValueError - async def test_async_for_queue(self, loop, connection, declare_queue): + async def test_async_for_queue( + self, event_loop, connection, declare_queue + ): channel2 = await self.create_channel(connection) queue = await declare_queue( @@ -1226,7 +1231,7 @@ async def publisher(): Message(body=str(i).encode()), routing_key=queue.name, ) - loop.create_task(publisher()) + event_loop.create_task(publisher()) count = 0 data = list() @@ -1242,7 +1247,7 @@ async def publisher(): assert data == list(map(lambda x: str(x).encode(), range(messages))) async def test_async_for_queue_context( - self, loop, connection, declare_queue, + self, event_loop, connection, declare_queue, ): channel2 = await self.create_channel(connection) @@ -1262,7 +1267,7 @@ async def publisher(): Message(body=str(i).encode()), routing_key=queue.name, ) - loop.create_task(publisher()) + event_loop.create_task(publisher()) count = 0 data = list() @@ -1279,7 +1284,8 @@ async def publisher(): assert data == list(map(lambda x: str(x).encode(), range(messages))) async def test_async_with_connection( - self, create_connection: Callable, connection, loop, declare_queue, + self, create_connection: Callable, + connection, event_loop, declare_queue, ): async with await create_connection() as connection: @@ -1301,7 +1307,7 @@ async def publisher(): Message(body=str(i).encode()), routing_key=queue.name, ) - loop.create_task(publisher()) + event_loop.create_task(publisher()) count = 0 data = list() @@ -1370,9 +1376,9 @@ async def test_channel_locked_resource( await q2.consume(print, exclusive=True) async def test_queue_iterator_close_was_called_twice( - self, create_connection: Callable, loop, declare_queue, + self, create_connection: Callable, event_loop, declare_queue, ): - future = loop.create_future() + future = event_loop.create_future() event = asyncio.Event() queue_name = get_random_name() @@ -1403,10 +1409,10 @@ async def task_inner(): future.set_exception(e) raise - task = loop.create_task(task_inner()) + task = event_loop.create_task(task_inner()) await event.wait() - loop.call_soon(task.cancel) + event_loop.call_soon(task.cancel) with pytest.raises(asyncio.CancelledError): await task @@ -1417,7 +1423,7 @@ async def task_inner(): async def test_queue_iterator_close_with_noack( self, create_connection: Callable, - loop, + event_loop, add_cleanup: Callable, declare_queue, ): @@ -1457,7 +1463,7 @@ async def task_inner(): Message(body), routing_key=queue_name, ) - task = loop.create_task(task_inner()) + task = event_loop.create_task(task_inner()) await task @@ -1563,7 +1569,7 @@ async def run(): await channel.set_qos(10) async def test_heartbeat_disabling( - self, loop, amqp_url: URL, connection_fabric, + self, event_loop, amqp_url: URL, connection_fabric, ): url = amqp_url.update_query(heartbeat=0) connection: AbstractConnection = await connection_fabric(url) @@ -1612,10 +1618,12 @@ async def test_connection_close( finally: await exchange.delete() - async def test_basic_return(self, connection: aio_pika.Connection, loop): + async def test_basic_return( + self, connection: aio_pika.Connection, event_loop + ): channel = await self.create_channel(connection) - f = loop.create_future() + f = event_loop.create_future() def handler(channel, message: ReturnedMessage): f.set_result(message) @@ -1634,7 +1642,7 @@ def handler(channel, message: ReturnedMessage): assert returned.body == body # handler with exception - f = loop.create_future() + f = event_loop.create_future() await channel.close() diff --git a/tests/test_amqp_robust.py b/tests/test_amqp_robust.py index 92b4d4df..51ad0c62 100644 --- a/tests/test_amqp_robust.py +++ b/tests/test_amqp_robust.py @@ -18,8 +18,8 @@ def connection_fabric(): @pytest.fixture -def create_connection(connection_fabric, loop, amqp_url): - return partial(connection_fabric, amqp_url, loop=loop) +def create_connection(connection_fabric, event_loop, amqp_url): + return partial(connection_fabric, amqp_url, loop=event_loop) class TestCaseNoRobust(TestCaseAmqp): diff --git a/tests/test_amqp_robust_proxy.py b/tests/test_amqp_robust_proxy.py index 836134d8..45b15b77 100644 --- a/tests/test_amqp_robust_proxy.py +++ b/tests/test_amqp_robust_proxy.py @@ -9,7 +9,7 @@ import aiormq.exceptions import pytest import shortuuid -from aiomisc_pytest.pytest_plugin import TCPProxy # type: ignore +from aiomisc_pytest import TCPProxy # type: ignore from yarl import URL import aio_pika @@ -60,20 +60,20 @@ def connection_fabric(): @pytest.fixture -def create_direct_connection(loop, amqp_direct_url): +def create_direct_connection(event_loop, amqp_direct_url): return partial( aio_pika.connect, amqp_direct_url.update_query( name=amqp_direct_url.query["name"] + "::direct", heartbeat=30, ), - loop=loop, + loop=event_loop, ) @pytest.fixture -def create_connection(connection_fabric, loop, amqp_url): - return partial(connection_fabric, amqp_url, loop=loop) +def create_connection(connection_fabric, event_loop, amqp_url): + return partial(connection_fabric, amqp_url, loop=event_loop) @pytest.fixture @@ -148,7 +148,7 @@ def reconnect_callback(conn): @aiomisc.timeout(30) async def test_robust_reconnect( create_connection, direct_connection, - proxy: TCPProxy, loop, add_cleanup: Callable, + proxy: TCPProxy, event_loop, add_cleanup: Callable, ): read_conn = await create_connection() # type: aio_pika.RobustConnection @@ -188,7 +188,7 @@ async def reader(queue_name): ) async with queue.iterator() as q: - loop.call_soon(consumer_event.set) + event_loop.call_soon(consumer_event.set) async for message in q: shared.append(message) @@ -197,7 +197,7 @@ async def reader(queue_name): logging.info("Exit reader task") try: - reader_task = loop.create_task(reader(queue.name)) + reader_task = event_loop.create_task(reader(queue.name)) await consumer_event.wait() logging.info("Disconnect all clients") @@ -260,7 +260,7 @@ async def test_channel_locked_resource2(connection: aio_pika.RobustConnection): async def test_channel_close_when_exclusive_queue( - create_connection, create_direct_connection, proxy: TCPProxy, loop, + create_connection, create_direct_connection, proxy: TCPProxy, event_loop, ): logging.info("Creating connections") direct_conn, proxy_conn = await asyncio.gather( @@ -299,7 +299,7 @@ async def close_after(delay, closer): await closer() logging.info("Closed") - await loop.create_task(close_after(5, direct_conn.close)) + await event_loop.create_task(close_after(5, direct_conn.close)) # reconnect fired await reconnect_event.wait() @@ -334,13 +334,13 @@ async def test_context_process_abrupt_channel_close( incoming_message = await queue.get(timeout=5) # close aiormq channel to emulate abrupt connection/channel close - await channel.channel.close() + underlay_channel = await channel.get_underlay_channel() + await underlay_channel.close() + with pytest.raises(aiormq.exceptions.ChannelInvalidStateError): async with incoming_message.process(): # emulate some activity on closed channel - await channel.channel.basic_publish( - b"dummy", exchange="", routing_key="non_existent", - ) + await channel.get_underlay_channel() # emulate connection/channel restoration of connect_robust await channel.reopen() @@ -429,14 +429,15 @@ async def reader(queue: aio_pika.Queue): @aiomisc.timeout(10) async def test_channel_restore( - connection_fabric, loop, amqp_url, proxy: TCPProxy, add_cleanup: Callable, + connection_fabric, event_loop, amqp_url, proxy: TCPProxy, + add_cleanup: Callable, ): heartbeat = 10 amqp_url = amqp_url.update_query(heartbeat=heartbeat) on_reopen = asyncio.Event() - conn = await connection_fabric(amqp_url, loop=loop) + conn = await connection_fabric(amqp_url, loop=event_loop) assert isinstance(conn, aio_pika.RobustConnection) async with conn: @@ -461,11 +462,12 @@ async def test_channel_restore( @aiomisc.timeout(20) async def test_channel_reconnect( - connection_fabric, loop, amqp_url, proxy: TCPProxy, add_cleanup: Callable, + connection_fabric, event_loop, amqp_url, + proxy: TCPProxy, add_cleanup: Callable, ): on_reconnect = asyncio.Event() - conn = await connection_fabric(amqp_url, loop=loop) + conn = await connection_fabric(amqp_url, loop=event_loop) assert isinstance(conn, aio_pika.RobustConnection) conn.reconnect_callbacks.add(lambda *_: on_reconnect.set(), weak=False) @@ -524,15 +526,17 @@ async def test_channel_reconnect_after_5kb( amqp_url, amqp_direct_url, connection_fabric, - loop: asyncio.AbstractEventLoop, + event_loop: asyncio.AbstractEventLoop, proxy: TCPProxy, add_cleanup: Callable, ): connection = await aio_pika.connect_robust( amqp_url.update_query(reconnect_interval=reconnect_timeout), - loop=loop, + loop=event_loop, + ) + direct_connection = await aio_pika.connect( + amqp_direct_url, loop=event_loop ) - direct_connection = await aio_pika.connect(amqp_direct_url, loop=loop) on_reconnect = asyncio.Event() connection.reconnect_callbacks.add( @@ -618,22 +622,22 @@ async def test_channel_reconnect_stairway( amqp_url: URL, amqp_direct_url: URL, connection_fabric, - loop: asyncio.AbstractEventLoop, + event_loop: asyncio.AbstractEventLoop, proxy: TCPProxy, add_cleanup: Callable, ): - loop.set_debug(True) + event_loop.set_debug(True) connection = await aio_pika.connect_robust( amqp_url.update_query( reconnect_interval=f"{reconnect_timeout:.2f}", name="proxy", ), - loop=loop, + loop=event_loop, ) direct_connection = await aio_pika.connect( - amqp_direct_url.update_query("name=direct"), loop=loop, + amqp_direct_url.update_query("name=direct"), loop=event_loop, ) on_reconnect = asyncio.Event() diff --git a/tests/test_amqps.py b/tests/test_amqps.py index c4575553..4ccd403f 100644 --- a/tests/test_amqps.py +++ b/tests/test_amqps.py @@ -15,7 +15,7 @@ def connection_fabric(request): @pytest.fixture -def create_connection(connection_fabric, loop, amqp_url): +def create_connection(connection_fabric, event_loop, amqp_url): ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.VerifyMode.CERT_NONE @@ -23,7 +23,7 @@ def create_connection(connection_fabric, loop, amqp_url): return partial( connection_fabric, amqp_url.with_scheme("amqps").with_port(5671), - loop=loop, + loop=event_loop, ssl_context=ssl_context, ) diff --git a/tests/test_pool.py b/tests/test_pool.py index fe1d908a..071cb7c0 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -7,7 +7,7 @@ @pytest.mark.parametrize("max_size", [50, 10, 5, 1]) -async def test_simple(max_size, loop): +async def test_simple(max_size, event_loop): counter = 0 async def create_instance(): @@ -16,7 +16,7 @@ async def create_instance(): counter += 1 return counter - pool: Pool = Pool(create_instance, max_size=max_size, loop=loop) + pool: Pool = Pool(create_instance, max_size=max_size, loop=event_loop) async def getter(): nonlocal counter, pool @@ -55,7 +55,7 @@ def max_size(self, request): return request.param @pytest.fixture - def pool(self, max_size, instances, loop): + def pool(self, max_size, instances, event_loop): async def create_instance(): nonlocal instances @@ -63,11 +63,11 @@ async def create_instance(): instances.add(obj) return obj - return Pool(create_instance, max_size=max_size, loop=loop) + return Pool(create_instance, max_size=max_size, loop=event_loop) class TestInstance(TestInstanceBase): - async def test_close(self, pool, instances, loop, max_size): + async def test_close(self, pool, instances, event_loop, max_size): async def getter(): async with pool.acquire(): await asyncio.sleep(0.05) @@ -115,7 +115,7 @@ async def getter(): class TestCaseNoMaxSize(TestInstance): - async def test_simple(self, pool, loop): + async def test_simple(self, pool, event_loop): call_count = 200 counter = 0 diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 98f2bf7d..7c4b2156 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -137,7 +137,9 @@ async def test_send_unknown_message( await rpc.close() - async def test_close_cancelling(self, channel: aio_pika.Channel, loop): + async def test_close_cancelling( + self, channel: aio_pika.Channel, event_loop + ): rpc = await RPC.create(channel, auto_delete=True) async def sleeper(): @@ -150,7 +152,7 @@ async def sleeper(): tasks = set() for _ in range(10): - tasks.add(loop.create_task(rpc.call(method_name))) + tasks.add(event_loop.create_task(rpc.call(method_name))) await rpc.close() diff --git a/tests/test_tools.py b/tests/test_tools.py index 5190ea09..320b8005 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -91,8 +91,10 @@ def test_callback_call(self, collection): async def test_blank_awaitable_callback(self, collection): await collection() - async def test_awaitable_callback(self, loop, collection, instance): - future = loop.create_future() + async def test_awaitable_callback( + self, event_loop, collection, instance + ): + future = event_loop.create_future() shared = [] @@ -101,7 +103,7 @@ async def coro(arg): shared.append(arg) def task_maker(arg): - return loop.create_task(coro(arg)) + return event_loop.create_task(coro(arg)) collection.add(future.set_result) collection.add(coro) @@ -112,8 +114,8 @@ def task_maker(arg): assert shared == [instance, instance] assert await future == instance - async def test_collection_create_tasks(self, loop, collection, instance): - future = loop.create_future() + async def test_collection_create_tasks(self, event_loop, collection, instance): + future = event_loop.create_future() async def coro(arg): await asyncio.sleep(0.5)