Skip to content

Commit bb40002

Browse files
Merge pull request #17 from vikramaditya91/bugfix/get-history-when-some-coins-may-be-null
Adjust the number of requests per minute to avoid hanging
2 parents dc1096d + 141cb0b commit bb40002

File tree

8 files changed

+125
-47
lines changed

8 files changed

+125
-47
lines changed

README.rst

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ Welcome to crypto-history
66
.. image:: https://img.shields.io/pypi/v/crypto-history.svg
77
:target: https://pypi.python.org/pypi/crypto-history
88

9-
.. image:: https://img.shields.io/pypi/l/crypto-history.svg
10-
:target: https://pypi.python.org/pypi/crypto-history
11-
129
.. image:: https://img.shields.io/pypi/wheel/crypto-history.svg
1310
:target: https://pypi.python.org/pypi/crypto-history
1411

@@ -27,9 +24,12 @@ Welcome to crypto-history
2724
:target: https://coveralls.io/github/vikramaditya91/crypto_history?branch=master
2825
:alt: crypto-history coveralls coverage
2926

27+
.. image:: https://travis-ci.org/vikramaditya91/crypto_history.svg?branch=master
28+
:target: https://travis-ci.org/vikramaditya91/crypto_history
29+
3030

3131
This is a wrapper on binance and other exchange APIs to aggregate historical information
32-
in structured tabular formats (such as xarray.DataArray)
32+
in structured tabular formats (such as xarray.DataArray and SQLite).
3333

3434
Source code
3535
https://github.com/vikramaditya91/crypto_history
@@ -50,6 +50,9 @@ Features
5050
- Obtains the history of each/all tickers in the xarray.DataArray format
5151
- Easily extendable to other exchanges and other data formats
5252
- It does not require an API key from Binance
53+
- null values can be dropped either timestamp-wise and/or coin-wise
54+
- Can export data in SQLite format and xr.DataArray
55+
- Chunks of time can be aggregated into a single data object
5356

5457
Quick Start
5558
-----------

changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
Changelog
22
=========
33

4+
1.2b9 (2020-Nov-11)
5+
-------------------
6+
* bugfix/write SQL even with incomplete data
47

58
1.2b7 (2020-Nov-09)
69
------------------

crypto_history/emit_data/save_to_disk.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22
import pathlib
3+
import logging
34
from sqlalchemy.orm import sessionmaker
45
import xarray as xr
56
import pandas as pd
@@ -9,6 +10,8 @@
910
from crypto_history.utilities.general_utilities import register_factory
1011
from crypto_history.utilities.general_utilities import check_for_write_access
1112

13+
logger = logging.getLogger(__package__)
14+
1215

1316
class AbstractDiskWriteCreator(ABC):
1417
"""Abstract disk-writer creator"""
@@ -123,16 +126,15 @@ def get_sql_table_name(dataarray: xr.DataArray,
123126
str, a string with the table of the SQL table
124127
125128
"""
126-
non_nan_values = dataarray.loc[
127-
:, reference_asset, :, "weight"
128-
].to_pandas().dropna()
129-
unique_values = pd.unique(
130-
non_nan_values.values.flatten()
131-
).tolist()
129+
weights = dataarray.loc[
130+
:, reference_asset, :, "weight"
131+
].values.flatten()
132+
unique_values = \
133+
set(filter(lambda x: isinstance(x, str), weights))
132134
assert len(unique_values) == 1, \
133135
f"More than 1 type of weights found. {unique_values}"
134136
return f"COIN_HISTORY_{ohlcv_field}_" \
135-
f"{reference_asset}_{unique_values[0]}"
137+
f"{reference_asset}_{unique_values.pop()}"
136138

137139
def yield_db_name_from_dataset(self,
138140
dataarray: xr.DataArray,
@@ -154,6 +156,12 @@ def yield_db_name_from_dataset(self,
154156
df = self.get_df_from_da(dataarray,
155157
reference_asset,
156158
ohlcv_field)
159+
if df.isnull().values.all():
160+
logger.warning(f"All the values in the df of {ohlcv_field}"
161+
f" for reference_asset {reference_asset} "
162+
f"are null")
163+
continue
164+
157165
table_name = self.get_sql_table_name(dataarray,
158166
reference_asset,
159167
ohlcv_field

crypto_history/stock_market/request.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ async def _request_with_retries(
6969
response from the request
7070
7171
"""
72-
self._log_request(method_name, *args, **kwargs)
7372
try:
7473
return await self._request(method_name, *args, **kwargs)
7574
except asyncio.exceptions.TimeoutError:
@@ -114,6 +113,7 @@ async def _request(self, method_name, *args, **kwargs):
114113
115114
"""
116115
await self.request_queue.hold_if_exceeded()
116+
self._log_request(method_name, *args, **kwargs)
117117
return await getattr(self._client, method_name)(*args, **kwargs)
118118

119119

@@ -128,7 +128,7 @@ def __init__(self):
128128
super().__init__()
129129
self._client = AsyncClient(api_key="", api_secret="")
130130
self.request_queue = TokenBucket(
131-
request_limit={timedelta(minutes=1): 1000}
131+
request_limit={timedelta(minutes=1): 400}
132132
)
133133

134134
async def _request_with_retries(
@@ -154,12 +154,18 @@ async def _request_with_retries(
154154
except exceptions.BinanceAPIException as e:
155155
# Error code corresponds to TOO_MANY_REQUESTS in Binance
156156
if e.code == -1003:
157+
wait_seconds = 5
157158
logger.warning(
158-
f"Request could not responds as TOO_MANY_REQUESTS. "
159-
f"SYNCHRONOUSLY pausing everything for 30 seconds. "
159+
f"Request could not respond as TOO_MANY_REQUESTS. "
160+
f"SYNCHRONOUSLY pausing everything for {wait_seconds} seconds. "
160161
f"Reason {e}"
161162
)
162-
time.sleep(30)
163+
time.sleep(wait_seconds)
164+
logger.warning(
165+
"Updating the Binance Session as it often gets stuck "
166+
"without it"
167+
)
168+
163169
return await self._retry(
164170
method_name, retry_strategy_state, *args, **kwargs
165171
)

crypto_history/utilities/general_utilities.py

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,80 @@ def init_logger(level=logging.INFO):
1616
logging.basicConfig(level=level, format=log_format)
1717

1818

19+
async def gather_with_concurrency(num_concurrent_tasks: int = None,
20+
*tasks):
21+
"""
22+
Limit the number of tasks that are gather concurrently
23+
Args:
24+
num_concurrent_tasks(int): Number of concurrent tasks
25+
*tasks: Tasks to be gathered
26+
27+
Returns: result of tasks
28+
29+
"""
30+
semaphore = asyncio.Semaphore(num_concurrent_tasks)
31+
32+
async def sem_task(task):
33+
"""
34+
If the num_concurrent_tasks is None, it does not \
35+
throttle and returns the task. If it is not None, it \
36+
throttles, the number of concurrent tasks
37+
Args:
38+
task: task which might have to be throttled
39+
40+
Returns: result of task
41+
42+
"""
43+
if num_concurrent_tasks is None:
44+
return await task
45+
46+
async with semaphore:
47+
return await task
48+
49+
all_tasks = (sem_task(task) for task in tasks)
50+
return await asyncio.gather(*all_tasks)
51+
52+
1953
async def gather_dict(tasks: dict):
54+
"""
55+
Gathers the tasks in a dict format (instead of typical List)
56+
Args:
57+
tasks(dict): Dictionary of tasks to be gathered.
58+
key represents the identifier, the value is the task
59+
60+
Returns: Result of the tasks in a dict format where key \
61+
is not modified from the tasks
62+
63+
"""
2064
async def mark(key, coroutine):
2165
return key, await coroutine
2266

67+
tasks_to_send = (mark(key, coroutine)
68+
for key, coroutine in tasks.items())
69+
2370
return {
2471
key: result
25-
for key, result in await asyncio.gather(
26-
*(mark(key, coroutine) for key, coroutine in tasks.items())
27-
)
72+
for key, result in await gather_with_concurrency(500,
73+
*tasks_to_send
74+
)
2875
}
2976

3077

31-
class TokenBucket:
78+
class Borg:
79+
"""Borg instance"""
80+
_shared_state = {}
81+
82+
def __init__(self):
83+
self.__dict__ = self._shared_state
84+
85+
86+
class TokenBucket(Borg):
3287
"""Controls the number of requests that can be made to the API.
3388
All times are written in micro-seconds for a good level of accuracy"""
3489

35-
def __init__(self, request_limit: Dict, pause_seconds: float = 1):
90+
def __init__(self,
91+
request_limit: Dict,
92+
pause_seconds: float = 1):
3693
"""
3794
Initializes the TokenBucket algorithm to throttle the flow of requests
3895
@@ -42,16 +99,18 @@ def __init__(self, request_limit: Dict, pause_seconds: float = 1):
4299
number of maximum requests allowed
43100
pause_seconds: amount of seconds to pause and test again
44101
"""
45-
self.queue = asyncio.Queue()
46-
(
47-
self.bucket_list,
48-
self.delta_t_list,
49-
self.max_requests_list,
50-
) = self._initialize_buckets(request_limit)
51-
self.last_check = datetime.now()
52-
self.pause_seconds = pause_seconds
53-
self.results = []
54-
self._counter = 0
102+
super().__init__()
103+
if not self._shared_state:
104+
self.queue = asyncio.Queue()
105+
(
106+
self.bucket_list,
107+
self.delta_t_list,
108+
self.max_requests_list,
109+
) = self._initialize_buckets(request_limit)
110+
self.last_check = datetime.now()
111+
self.pause_seconds = pause_seconds
112+
self.results = []
113+
self._counter = 0
55114

56115
@staticmethod
57116
def _initialize_buckets(requests_dict: Dict):

crypto_history/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.2.b7"
1+
__version__ = "1.2.b9"

examples/coin_history_to_sqlite.db.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,28 @@ async def main():
1010

1111
exchange_factory = class_builders.get("market").get("binance")()
1212

13+
async with exchange_factory.create_data_homogenizer() \
14+
as binance_homogenizer:
15+
base_assets = await binance_homogenizer.get_all_base_assets()
16+
1317
desired_fields = ["open_ts", "open", "close"]
14-
time_aggregated_data_container = data_container_access.TimeAggregatedDataContainer(
18+
candle_type = "1m"
19+
time_aggregated_data_container = data_container_access.TimeAggregatedDataContainer.create_instance(
1520
exchange_factory,
16-
base_assets=["NANO", "AMB", "XRP"],
21+
base_assets=base_assets,
1722
reference_assets=["BTC"],
1823
ohlcv_fields=desired_fields,
19-
time_range_dict={("25 Jan 2018", "27 Feb 2018"): "1d",
20-
("26 Aug 2020", "now"): "1d"}
24+
time_range_dict={("25 Jan 2015", "9 Nov 2020"): candle_type}
2125
)
2226
xdataarray_of_coins = await time_aggregated_data_container.get_time_aggregated_data_container()
2327
type_converter = data_container_post.TypeConvertedData(exchange_factory)
2428
type_converted_dataarray = type_converter.set_type_on_dataarray(xdataarray_of_coins)
2529

26-
incomplete_data_handle = data_container_post.HandleIncompleteData()
27-
entire_na_removed_dataarray = incomplete_data_handle.\
28-
drop_xarray_coins_with_entire_na(type_converted_dataarray)
29-
strict_na_dropped_dataarray = incomplete_data_handle.\
30-
nullify_incomplete_data_from_dataarray(entire_na_removed_dataarray)
31-
32-
with tempfile.NamedTemporaryFile(mode="w", suffix=".db") as temp_file:
33-
sql_writer = class_builders.get("write_to_disk").get("sqlite")()
34-
save_to_disk.write_coin_history_to_file(strict_na_dropped_dataarray,
30+
sql_writer = class_builders.get("write_to_disk").get("sqlite")()
31+
with tempfile.NamedTemporaryFile(suffix=".db") as temp_db:
32+
save_to_disk.write_coin_history_to_file(type_converted_dataarray,
3533
sql_writer,
36-
temp_file.name,
34+
temp_db.name,
3735
["open", "close"])
3836

3937

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ def get_version(rel_path):
4646
packages=["crypto_history",
4747
"crypto_history.data_container",
4848
"crypto_history.stock_market",
49-
"crypto_history.utilities"],
49+
"crypto_history.utilities",
50+
"crypto_history.emit_data"],
5051
url="https://github.com/vikramaditya91/crypto_history",
5152
author_email="[email protected]",
5253
python_requires=">=3.8.0",

0 commit comments

Comments
 (0)