Skip to content

Commit e57583f

Browse files
added async support for callback function global and stream specific
1 parent 29631d2 commit e57583f

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

CHANGELOG.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
1111

1212
## 2.1.4.dev (development stage/unreleased/unstable)
1313
### Added
14-
- Parameter `process_stream_data_async` to `manager.py`
15-
- `await self.manager.process_stream_data_async()` to `socket.py`. This means it is possible to provide a asyncio
16-
function.
14+
- Parameter `process_stream_data_async` in `manager.py` to `BinanceWebSocketApiManager()` as global setting parameter
15+
and to `create_stream()` as stream specific parameter.
16+
- `await self.manager.process_stream_data_async()` and
17+
`await self.manager.specific_process_stream_data_async[stream_id]()` to `socket.py`. This means it is possible to
18+
provide a global and a stream specific asyncio function.
1719
### Changed
1820
- Parameter `process_stream_data` in `manager.py` from `False` to `None`
1921
- Parameter `process_stream_signals` in `manager.py` from `False` to `None`

unicorn_binance_websocket_api/manager.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ def __init__(self,
281281
colorama.init()
282282
logger.info(f"Using `websockets_{websockets.__version__}`")
283283
self.specific_process_stream_data = {}
284-
284+
self.specific_process_stream_data_async = {}
285285
if process_stream_data is None:
286286
# no special method to process stream data provided, so we use add_to_stream_buffer:
287287
self.process_stream_data = self.add_to_stream_buffer
@@ -470,7 +470,8 @@ def _add_stream_to_stream_list(self,
470470
close_timeout=None,
471471
stream_buffer_maxlen=None,
472472
api=False,
473-
process_stream_data=None):
473+
process_stream_data=None,
474+
process_stream_data_async=None):
474475
"""
475476
Create a list entry for new streams
476477
@@ -537,12 +538,21 @@ def _add_stream_to_stream_list(self,
537538
get stored in the stream_buffer! `How to read from stream_buffer!
538539
<https://unicorn-binance-websocket-api.docs.lucit.tech/README.html#and-4-more-lines-to-print-the-receives>`_
539540
:type process_stream_data: function
541+
:param process_stream_data_async: Provide a asynchronous function/method to process the received webstream data.
542+
The function will be called instead of
543+
`add_to_stream_buffer() <unicorn_binance_websocket_api.html#unicorn_binance_websocket_api.manager.BinanceWebSocketApiManager.add_to_stream_buffer>`_
544+
like `process_stream_data(stream_data, stream_buffer_name)` where
545+
`stream_data` cointains the raw_stream_data. If not provided, the raw stream_data will
546+
get stored in the stream_buffer! `How to read from stream_buffer!
547+
<https://unicorn-binance-websocket-api.docs.lucit.tech/README.html#and-4-more-lines-to-print-the-receives>`_
548+
:type process_stream_data_async: function
540549
"""
541550
output = output or self.output_default
542551
close_timeout = close_timeout or self.close_timeout_default
543552
ping_interval = ping_interval or self.ping_interval_default
544553
ping_timeout = ping_timeout or self.ping_timeout_default
545554
self.specific_process_stream_data[stream_id] = process_stream_data
555+
self.specific_process_stream_data_async[stream_id] = process_stream_data_async
546556
self.stream_threading_lock[stream_id] = {'full_lock': threading.Lock(),
547557
'receives_statistic_last_second_lock': threading.Lock()}
548558
self.stream_list[stream_id] = {'exchange': self.exchange,
@@ -1370,7 +1380,8 @@ def create_stream(self,
13701380
close_timeout=None,
13711381
stream_buffer_maxlen=None,
13721382
api=False,
1373-
process_stream_data=None):
1383+
process_stream_data=None,
1384+
process_stream_data_async=None):
13741385
"""
13751386
Create a websocket stream
13761387
@@ -1478,6 +1489,15 @@ def create_stream(self,
14781489
object instantiation! `How to read from stream_buffer!
14791490
<https://unicorn-binance-websocket-api.docs.lucit.tech/README.html?highlight=pop_stream_data_from_stream_buffer#and-4-more-lines-to-print-the-receives>`_
14801491
:type process_stream_data: function
1492+
:param process_stream_data_async: Provide an asynchronous function/method to process the received webstream data (callback). The
1493+
function will be called instead of
1494+
`add_to_stream_buffer() <unicorn_binance_websocket_api.html#unicorn_binance_websocket_api.manager.BinanceWebSocketApiManager.add_to_stream_buffer>`_
1495+
like `process_stream_data(stream_data)` where
1496+
`stream_data` cointains the raw_stream_data. If not provided, the raw stream_data will
1497+
get stored in the stream_buffer or provided to the global callback function provided during
1498+
object instantiation! `How to read from stream_buffer!
1499+
<https://unicorn-binance-websocket-api.docs.lucit.tech/README.html?highlight=pop_stream_data_from_stream_buffer#and-4-more-lines-to-print-the-receives>`_
1500+
:type process_stream_data_async: function
14811501
:return: stream_id or 'False'
14821502
"""
14831503
# handle Websocket API streams: https://developers.binance.com/docs/binance-trading-api/websocket_api
@@ -1545,7 +1565,8 @@ def create_stream(self,
15451565
close_timeout=close_timeout,
15461566
stream_buffer_maxlen=stream_buffer_maxlen,
15471567
api=api,
1548-
process_stream_data=process_stream_data)
1568+
process_stream_data=process_stream_data,
1569+
process_stream_data_async=process_stream_data_async)
15491570
try:
15501571
loop = asyncio.new_event_loop()
15511572
except OSError as error_msg:

unicorn_binance_websocket_api/sockets.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ async def start_socket(self):
133133
received_stream_data = self.unicorn_fy.binance_com_futures_websocket(received_stream_data_json)
134134
elif self.exchange == "binance.com-futures-testnet":
135135
received_stream_data = self.unicorn_fy.binance_com_futures_websocket(received_stream_data_json)
136-
elif self.exchange == "binance.com-coin-futures" or self.exchange == "binance.com-coin_futures":
136+
elif self.exchange == "binance.com-coin-futures" \
137+
or self.exchange == "binance.com-coin_futures":
137138
received_stream_data = self.unicorn_fy.binance_com_coin_futures_websocket(received_stream_data_json)
138139
elif self.exchange == "binance.je":
139140
received_stream_data = self.unicorn_fy.binance_je_websocket(received_stream_data_json)
@@ -190,6 +191,9 @@ async def start_socket(self):
190191
elif self.manager.specific_process_stream_data[self.stream_id] is not None:
191192
# if create_stream() got a callback function -> use it
192193
self.manager.specific_process_stream_data[self.stream_id](received_stream_data)
194+
elif self.manager.specific_process_stream_data_async[self.stream_id] is not None:
195+
# if create_stream() got an asynchronous callback function -> use it
196+
await self.manager.specific_process_stream_data_async[self.stream_id](received_stream_data)
193197
else:
194198
# Use the default process_stream_data function provided to/by the manager class
195199
if self.manager.process_stream_data_async is None:

0 commit comments

Comments
 (0)