7
7
from models import Trade , Symbol , Exchange , Kline
8
8
9
9
10
- one_day = 60 * 60 * 24 * 1000
10
+ ONE_DAY = 60 * 60 * 24 * 1000 # 一天毫秒数
11
11
LIMIT = 500
12
12
13
13
14
14
async def main ():
15
15
begin_timestamp = 1575129600000 # 开始时间, 12-1 00:00:00.000
16
- # end_timestamp = 1578585600000 # 结束时间 01-10 00:00:00.000
17
- end_timestamp = begin_timestamp + one_day * 2
18
- for begin_dt in range (begin_timestamp , end_timestamp , one_day ):
16
+ end_timestamp = 1578585600000 # 结束时间 01-10 00:00:00.000
17
+ # end_timestamp = begin_timestamp + ONE_DAY * 2
18
+ for begin_dt in range (begin_timestamp , end_timestamp , ONE_DAY ):
19
19
await day_loop (begin_dt )
20
- # 计算
21
- # 存储
22
20
23
21
24
22
async def day_loop (begin_dt ):
@@ -34,23 +32,27 @@ async def day_loop(begin_dt):
34
32
# 查询trade数据
35
33
trade = Trade (exchange_name , focus_symbol )
36
34
kline = Kline (exchange_name , focus_symbol )
37
- await insert (trade , kline , begin_dt )
35
+
36
+ # 计算
37
+ klines = await calculate (trade , kline , begin_dt )
38
+
39
+ # 存储
40
+ await insert (kline , begin_dt , klines )
38
41
break
39
42
40
43
41
- async def insert (trade , kline , begin_timestamp ):
44
+ async def calculate (trade , kline , begin_timestamp ):
42
45
# 查询上一天最后一笔trade数据的trade price
43
46
prev_close_price = await query_prev_close_price (begin_timestamp , trade )
44
47
prev_kline = await query_prev_kline (kline , begin_timestamp )
45
- print (prev_kline , "prev_kline" )
46
48
47
49
klines = []
48
50
kline_document = {}
49
51
update_flag = True # 是否使用update_one
50
52
51
53
# 按照 kline 时间段划分
52
54
# for begin_dt in range(begin_timestamp, begin_timestamp + kline.interval * 5, kline.interval):
53
- for begin_dt in range (begin_timestamp , begin_timestamp + one_day , kline .interval ):
55
+ for begin_dt in range (begin_timestamp , begin_timestamp + ONE_DAY , kline .interval ):
54
56
end_dt = begin_dt + kline .interval
55
57
kline_document = {
56
58
"begin_dt" : begin_dt ,
@@ -112,14 +114,11 @@ async def insert(trade, kline, begin_timestamp):
112
114
open_trades = []
113
115
close_trades = []
114
116
tradeprices = []
115
- sum_amount_check = 0 # TODO: 之后删除
116
117
117
118
duration = kline .interval * 0.2
118
119
119
120
for t_document in trades :
120
121
tradeprices .append (t_document ["tradeprice" ])
121
- # TODO: amount_check = sum(trade price * volume)
122
- sum_amount_check += t_document ["tradeprice" ] * t_document ["volume" ]
123
122
124
123
if t_document ["direction" ] == "buy" :
125
124
buy_trades .append (t_document )
@@ -135,7 +134,6 @@ async def insert(trade, kline, begin_timestamp):
135
134
low = min (tradeprices ) if tradeprices else 0.0
136
135
kline_document ["high" ] = high
137
136
kline_document ["low" ] = low
138
- kline_document ["amount_check" ] = sum_amount_check
139
137
140
138
kline_document ["book_count" ] = len (trades )
141
139
kline_document ["buy_book_count" ] = len (buy_trades )
@@ -184,18 +182,13 @@ async def insert(trade, kline, begin_timestamp):
184
182
kline_document ["sectional_sell_book_count" ] = prev_kline .get ("sectional_sell_book_count" , 0.0 ) + \
185
183
kline_document ["sell_book_count" ]
186
184
187
- # TODO: check open_avg_check = sum(trade price * volume) / sum(volume)
188
185
if open_trades :
189
186
results = handle_documents (open_trades )
190
187
kline_document ["open_avg" ] = results ["avg_price" ]
191
- # TODO: 删除check
192
- kline_document ["open_avg_check" ] = get_avg_check (open_trades )
193
188
194
189
if close_trades :
195
190
results = handle_documents (close_trades )
196
191
kline_document ["close_avg" ] = results ["avg_price" ]
197
- # TODO: 删除check
198
- kline_document ["close_avg_check" ] = get_avg_check (close_trades )
199
192
200
193
kline_document ["lag_ret" ] = math .log (kline_document ["close_avg" ] / prev_kline ["close_avg" ]) \
201
194
if prev_kline ["close_avg" ] and kline_document ["close_avg" ] else None
@@ -226,14 +219,18 @@ async def insert(trade, kline, begin_timestamp):
226
219
227
220
# 加上最后一个没有prev_price 的kline
228
221
klines .append (InsertOne (kline_document ))
222
+ return klines
223
+
224
+
225
+ async def insert (kline , begin_timestamp , klines ):
229
226
for offset in range (0 , len (klines ), LIMIT ):
230
227
result = await kline .insert_many (klines [offset : offset + LIMIT ])
231
228
print (result .bulk_api_result , begin_timestamp , kline .collection_name )
232
229
233
230
234
231
async def query_prev_close_price (begin_timestamp , trade ):
235
232
prev_trade_cursor = trade .collection .find (
236
- {"tradedt" : {"$gte" : begin_timestamp - one_day , "$lt" : begin_timestamp }}).sort ("tradedt" , - 1 )
233
+ {"tradedt" : {"$gte" : begin_timestamp - ONE_DAY , "$lt" : begin_timestamp }}).sort ("tradedt" , - 1 )
237
234
prev_trades = [t_document for t_document in await prev_trade_cursor .to_list (length = 1 )]
238
235
prev_close_price = prev_trades [0 ].get ("tradeprice" , 0.0 ) if prev_trades else 0.0
239
236
@@ -250,19 +247,6 @@ async def query_prev_kline(kline, begin_timestamp):
250
247
}
251
248
252
249
253
- async def calculate ():
254
- pass
255
-
256
-
257
- def get_avg_check (documents ):
258
- divisor = 0
259
- denominator = 0
260
- for document in documents :
261
- divisor += document ["tradeprice" ] * document ["volume" ]
262
- denominator += document ["volume" ]
263
- return divisor / denominator
264
-
265
-
266
250
def handle_documents (documents ):
267
251
sum_amount = 0
268
252
sum_volume = 0
@@ -277,14 +261,6 @@ def handle_documents(documents):
277
261
}
278
262
279
263
280
- async def get_df ():
281
- kline = Kline (exchange_name = "binance" , symbol_name = "btcusdt" )
282
- begin = 1575129600000 + 60 * 60 * 8 * 1000
283
- end = 1575129600000 + 60 * 60 * 10 * 1000
284
- df = await kline .get_df_from_table ({"begin_dt" : {"$gte" : begin , "$lt" : end }})
285
- print (df )
286
-
287
-
288
264
if __name__ == '__main__' :
289
265
# TODO: 根据传入参数决定生成的kline种类
290
266
loop = asyncio .get_event_loop ()
0 commit comments