Skip to content

Commit 0dfc710

Browse files
committed
extended reactivex channel test
1 parent 7002977 commit 0dfc710

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

tests/test_reactivex/test_reactivex_server_side.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class Handler(BaseReactivexHandler):
1919

2020
def __init__(self, server_done: Optional[asyncio.Event] = None):
2121
self._server_done = server_done
22+
self.received_payloads = []
2223

2324
async def request_stream(self, payload: Payload) -> Observable:
2425
return reactivex.from_iterable((Payload(ensure_bytes('Feed Item: {}'.format(index))) for index in range(3)))
@@ -29,6 +30,7 @@ async def request_channel(self, payload: Payload) -> ReactivexChannel:
2930

3031
def observer(value: Payload):
3132
logging.info(f'Received by test server: {value.data}')
33+
self.received_payloads.append(value.data)
3234

3335
observer(payload)
3436

@@ -64,9 +66,12 @@ async def test_serve_reactivex_channel(pipe: Tuple[RSocketServer, RSocketClient]
6466
server, client = pipe
6567

6668
server_done_event = asyncio.Event()
69+
handler = None
6770

6871
def handler_factory():
69-
return Handler(server_done_event)
72+
nonlocal handler
73+
handler = Handler(server_done_event)
74+
return handler
7075

7176
server.set_handler_using_factory(reactivex_handler_factory(handler_factory))
7277

@@ -86,6 +91,12 @@ def handler_factory():
8691

8792
await server_done_event.wait()
8893

94+
assert len(handler.received_payloads) == 4
95+
assert handler.received_payloads[0] == b'request text'
96+
assert handler.received_payloads[1] == b'Client item: 0'
97+
assert handler.received_payloads[2] == b'Client item: 1'
98+
assert handler.received_payloads[3] == b'Client item: 2'
99+
89100

90101
async def test_serve_reactivex_response(pipe: Tuple[RSocketServer, RSocketClient]):
91102
server, client = pipe

0 commit comments

Comments
 (0)