Skip to content

Commit d14710b

Browse files
committed
fix graphql send cancel on break from subscription loop
1 parent 37b479f commit d14710b

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

rsocket/graphql/rsocket_transport.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from reactivestreams.subscriber import DefaultSubscriber
1010
from rsocket.extensions.helpers import composite, route
1111
from rsocket.frame_helpers import str_to_bytes
12+
from rsocket.logger import logger
1213
from rsocket.payload import Payload
1314
from rsocket.rsocket_client import RSocketClient
1415

@@ -104,6 +105,14 @@ def on_complete(self):
104105
def on_error(self, exception: Exception):
105106
self._received_queue.put_nowait(exception)
106107

108+
def cancel(self):
109+
if self.subscription is not None:
110+
self.subscription.cancel()
111+
112+
def request(self, n: int):
113+
if self.subscription is not None:
114+
self.subscription.request(n)
115+
107116
rsocket_payload = self._create_rsocket_payload(document, variable_values, operation_name)
108117

109118
received_queue = Queue()
@@ -120,4 +129,12 @@ def on_error(self, exception: Exception):
120129
if response is complete_object:
121130
break
122131

123-
yield self._response_to_execution_result(response)
132+
execution_result = self._response_to_execution_result(response)
133+
134+
try:
135+
yield execution_result
136+
except GeneratorExit:
137+
logger().debug('Generator exited')
138+
subscriber.cancel()
139+
return
140+

tests/test_integrations/test_graphql.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
from gql import Client, gql
24

35
from rsocket.extensions.mimetypes import WellKnownMimeTypes
@@ -75,4 +77,38 @@ def handler_factory():
7577
gql("""{__schema { types { name } } }"""),
7678
get_execution_result=True)
7779

78-
assert response.data == expected_schema
80+
assert response.data == expected_schema
81+
82+
83+
async def test_graphql_break_loop(lazy_pipe, graphql_schema):
84+
def handler_factory():
85+
return RoutingRequestHandler(graphql_handler(graphql_schema, 'graphql'))
86+
87+
async with lazy_pipe(
88+
client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA},
89+
server_arguments={'handler_factory': handler_factory}) as (server, client):
90+
graphql = Client(
91+
schema=graphql_schema,
92+
transport=RSocketTransport(client),
93+
)
94+
95+
responses = []
96+
i = 0
97+
async for response in graphql.subscribe_async(
98+
document=gql("""
99+
subscription {
100+
greetings {message}
101+
}
102+
"""),
103+
get_execution_result=True):
104+
responses.append(response.data)
105+
i += 1
106+
if i > 4:
107+
break
108+
109+
assert len(responses) == 5
110+
assert responses[0] == {'greetings': {'message': 'Hello world 0'}}
111+
112+
await asyncio.sleep(1)
113+
114+
# assert responses[9] == {'greetings': {'message': 'Hello world 9'}}

0 commit comments

Comments
 (0)