Skip to content

Commit 744568f

Browse files
committed
guide example statistics cleanup.
added passing subscriber methods as arguments to DefaultSubscriber class
1 parent 509f193 commit 744568f

File tree

8 files changed

+85
-105
lines changed

8 files changed

+85
-105
lines changed

Diff for: examples/tutorial/step6/chat_client.py

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def on_next(self, payload: Payload, is_complete=False):
3434
if is_complete:
3535
self.done.set()
3636

37+
def on_error(self, exception: Exception):
38+
raise exception
39+
3740
def cancel(self):
3841
self.subscription.cancel()
3942

Diff for: examples/tutorial/step6/chat_server.py

+11-16
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class ChatUserSession:
105105

106106
def __init__(self):
107107
self._session: Optional[UserSessionData] = None
108+
self._requested_statistics = ServerStatisticsRequest()
108109

109110
def router_factory(self):
110111
router = RequestRouter(payload_mapper=decode_payload)
@@ -163,34 +164,28 @@ async def receive_statistics(statistics: ClientStatistics):
163164
@router.channel('statistics')
164165
async def send_statistics() -> Tuple[Optional[Publisher], Optional[Subscriber]]:
165166

166-
requested_statistics = ServerStatisticsRequest()
167-
168167
async def statistics_generator():
169168
while True:
170169
try:
171-
await asyncio.sleep(requested_statistics.period_seconds)
172-
next_message = new_statistics_data(requested_statistics)
170+
await asyncio.sleep(self._requested_statistics.period_seconds)
171+
next_message = new_statistics_data(self._requested_statistics)
173172

174173
yield dataclass_to_payload(next_message), False
175174
except Exception:
176175
logging.error('Statistics', exc_info=True)
177176

178-
class StatisticsSubscriber(DefaultSubscriber):
179-
180-
def on_next(self, payload: Payload, is_complete=False):
181-
request = decode_dataclass(payload.data, ServerStatisticsRequest)
182-
183-
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
177+
def on_next(payload: Payload, is_complete=False):
178+
request = decode_dataclass(payload.data, ServerStatisticsRequest)
184179

185-
if request.ids is not None:
186-
requested_statistics.ids = request.ids
180+
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
187181

188-
if request.period_seconds is not None:
189-
requested_statistics.period_seconds = request.period_seconds
182+
if request.ids is not None:
183+
self._requested_statistics.ids = request.ids
190184

191-
subscriber = StatisticsSubscriber()
185+
if request.period_seconds is not None:
186+
self._requested_statistics.period_seconds = request.period_seconds
192187

193-
return StreamFromAsyncGenerator(statistics_generator), subscriber
188+
return StreamFromAsyncGenerator(statistics_generator), DefaultSubscriber(on_next=on_next)
194189

195190
@router.response('message')
196191
async def send_message(message: Message) -> Awaitable[Payload]:

Diff for: examples/tutorial/step7/chat_client.py

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def on_next(self, payload: Payload, is_complete=False):
3737
if is_complete:
3838
self.done.set()
3939

40+
def on_error(self, exception: Exception):
41+
raise exception
42+
4043
def cancel(self):
4144
self.subscription.cancel()
4245

Diff for: examples/tutorial/step7/chat_server.py

+29-48
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from rsocket.routing.routing_request_handler import RoutingRequestHandler
2222
from rsocket.rsocket_server import RSocketServer
2323
from rsocket.streams.empty_stream import EmptyStream
24+
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
2425
from rsocket.streams.stream_from_generator import StreamFromGenerator
2526
from rsocket.transports.tcp import TransportTCP
2627

@@ -88,10 +89,22 @@ def find_username_by_session(session_id: SessionId) -> Optional[str]:
8889
return session.username
8990

9091

92+
def new_statistics_data(statistics_request: ServerStatisticsRequest):
93+
statistics_data = {}
94+
95+
if 'users' in statistics_request.ids:
96+
statistics_data['user_count'] = len(chat_data.user_session_by_id)
97+
98+
if 'channels' in statistics_request.ids:
99+
statistics_data['channel_count'] = len(chat_data.channel_messages)
100+
101+
return ServerStatistics(**statistics_data)
102+
91103
class ChatUserSession:
92104

93105
def __init__(self):
94106
self._session: Optional[UserSessionData] = None
107+
self._requested_statistics = ServerStatisticsRequest()
95108

96109
def router_factory(self):
97110
router = RequestRouter(payload_mapper=decode_payload)
@@ -150,60 +163,28 @@ async def receive_statistics(statistics: ClientStatistics):
150163
@router.channel('statistics')
151164
async def send_statistics() -> Tuple[Optional[Publisher], Optional[Subscriber]]:
152165

153-
class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
154-
155-
def __init__(self, session: UserSessionData):
156-
super().__init__()
157-
self._session = session
158-
self._requested_statistics = ServerStatisticsRequest()
159-
self._sender: Optional[Task] = None
160-
161-
def cancel(self):
162-
if self._sender is not None:
163-
logging.info('Canceling statistics sender task')
164-
self._sender.cancel()
165-
self._sender = None
166-
167-
def subscribe(self, subscriber: Subscriber):
168-
super().subscribe(subscriber)
169-
subscriber.on_subscribe(self)
170-
self._sender = asyncio.create_task(self._statistics_sender())
171-
172-
async def _statistics_sender(self):
173-
while True:
174-
try:
175-
await asyncio.sleep(self._requested_statistics.period_seconds)
176-
next_message = self.new_statistics_data()
177-
178-
self._subscriber.on_next(dataclass_to_payload(next_message))
179-
except Exception:
180-
logging.error('Statistics', exc_info=True)
181-
182-
def new_statistics_data(self):
183-
statistics_data = {}
184-
185-
if 'users' in self._requested_statistics.ids:
186-
statistics_data['user_count'] = len(chat_data.user_session_by_id)
187-
188-
if 'channels' in self._requested_statistics.ids:
189-
statistics_data['channel_count'] = len(chat_data.channel_messages)
190-
191-
return ServerStatistics(**statistics_data)
166+
async def statistics_generator():
167+
while True:
168+
try:
169+
await asyncio.sleep(self._requested_statistics.period_seconds)
170+
next_message = new_statistics_data(self._requested_statistics)
192171

193-
def on_next(self, payload: Payload, is_complete=False):
194-
request = decode_dataclass(payload.data, ServerStatisticsRequest)
172+
yield dataclass_to_payload(next_message), False
173+
except Exception:
174+
logging.error('Statistics', exc_info=True)
195175

196-
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
176+
def on_next(payload: Payload, is_complete=False):
177+
request = decode_dataclass(payload.data, ServerStatisticsRequest)
197178

198-
if request.ids is not None:
199-
self._requested_statistics.ids = request.ids
179+
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
200180

201-
if request.period_seconds is not None:
202-
self._requested_statistics.period_seconds = request.period_seconds
181+
if request.ids is not None:
182+
self._requested_statistics.ids = request.ids
203183

204-
response = StatisticsChannel(self._session)
184+
if request.period_seconds is not None:
185+
self._requested_statistics.period_seconds = request.period_seconds
205186

206-
return response, response
187+
return StreamFromAsyncGenerator(statistics_generator), DefaultSubscriber(on_next=on_next)
207188

208189
@router.response('message')
209190
async def send_message(message: Message) -> Awaitable[Payload]:

Diff for: examples/tutorial/step8/chat_client.py

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def on_next(self, payload: Payload, is_complete=False):
3737
if is_complete:
3838
self.done.set()
3939

40+
def on_error(self, exception: Exception):
41+
raise exception
42+
4043
def cancel(self):
4144
self.subscription.cancel()
4245

Diff for: examples/tutorial/step8/chat_server.py

+18-37
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from rsocket.routing.routing_request_handler import RoutingRequestHandler
2424
from rsocket.rsocket_server import RSocketServer
2525
from rsocket.streams.empty_stream import EmptyStream
26+
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
2627
from rsocket.streams.stream_from_generator import StreamFromGenerator
2728
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket
2829

@@ -106,6 +107,7 @@ class ChatUserSession:
106107

107108
def __init__(self):
108109
self._session: Optional[UserSessionData] = None
110+
self._requested_statistics = ServerStatisticsRequest()
109111

110112
def router_factory(self):
111113
router = RequestRouter(payload_mapper=decode_payload)
@@ -164,49 +166,28 @@ async def receive_statistics(statistics: ClientStatistics):
164166
@router.channel('statistics')
165167
async def send_statistics() -> Tuple[Optional[Publisher], Optional[Subscriber]]:
166168

167-
class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
169+
async def statistics_generator():
170+
while True:
171+
try:
172+
await asyncio.sleep(self._requested_statistics.period_seconds)
173+
next_message = new_statistics_data(self._requested_statistics)
168174

169-
def __init__(self, session: UserSessionData):
170-
super().__init__()
171-
self._session = session
172-
self._requested_statistics = ServerStatisticsRequest()
173-
self._sender: Optional[Task] = None
174-
175-
def cancel(self):
176-
if self._sender is not None:
177-
logging.info('Canceling statistics sender task')
178-
self._sender.cancel()
179-
self._sender = None
180-
181-
def subscribe(self, subscriber: Subscriber):
182-
super().subscribe(subscriber)
183-
subscriber.on_subscribe(self)
184-
self._sender = asyncio.create_task(self._statistics_sender())
185-
186-
async def _statistics_sender(self):
187-
while True:
188-
try:
189-
await asyncio.sleep(self._requested_statistics.period_seconds)
190-
next_message = new_statistics_data(self._requested_statistics)
191-
192-
self._subscriber.on_next(dataclass_to_payload(next_message))
193-
except Exception:
194-
logging.error('Statistics', exc_info=True)
195-
196-
def on_next(self, payload: Payload, is_complete=False):
197-
request = decode_dataclass(payload.data, ServerStatisticsRequest)
175+
yield dataclass_to_payload(next_message), False
176+
except Exception:
177+
logging.error('Statistics', exc_info=True)
198178

199-
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
179+
def on_next(payload: Payload, is_complete=False):
180+
request = decode_dataclass(payload.data, ServerStatisticsRequest)
200181

201-
if request.ids is not None:
202-
self._requested_statistics.ids = request.ids
182+
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
203183

204-
if request.period_seconds is not None:
205-
self._requested_statistics.period_seconds = request.period_seconds
184+
if request.ids is not None:
185+
self._requested_statistics.ids = request.ids
206186

207-
response = StatisticsChannel(self._session)
187+
if request.period_seconds is not None:
188+
self._requested_statistics.period_seconds = request.period_seconds
208189

209-
return response, response
190+
return StreamFromAsyncGenerator(statistics_generator), DefaultSubscriber(on_next=on_next)
210191

211192
@router.response('message')
212193
async def send_message(message: Message) -> Awaitable[Payload]:

Diff for: examples/tutorial/test_tutorials.py

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
'step5',
1818
'step6',
1919
'step7',
20+
'step8',
2021
'reactivex']
2122
2223
)

Diff for: reactivestreams/subscriber.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,30 @@ def on_complete(self):
2727

2828

2929
class DefaultSubscriber(Subscriber):
30-
def __init__(self):
30+
def __init__(self, on_next=None,
31+
on_error=None,
32+
on_complete=None,
33+
on_subscribe=None):
34+
self._on_subscribe = on_subscribe
35+
self._on_complete = on_complete
36+
self._on_error = on_error
37+
self._on_next = on_next
3138
self.subscription: Optional[Subscription] = None
3239

3340
def on_next(self, value, is_complete=False):
34-
pass
41+
if self._on_next is not None:
42+
self._on_next(value, is_complete)
3543

3644
def on_error(self, exception: Exception):
37-
pass
45+
if self._on_error is not None:
46+
self._on_error(exception)
3847

3948
def on_subscribe(self, subscription: Subscription):
4049
self.subscription = subscription
4150

51+
if self._on_subscribe is not None:
52+
self._on_subscribe(subscription)
53+
4254
def on_complete(self):
43-
pass
55+
if self._on_complete is not None:
56+
self._on_complete()

0 commit comments

Comments
 (0)