-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvirus_utils.py
187 lines (143 loc) · 6.18 KB
/
virus_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import csv
import datetime
import json
import os
import re
import time
from collections import defaultdict
from datetime import datetime
from io import StringIO
from typing import Optional
import requests
from dateutil.parser import parse as parsedate
import io_utils
from models import Country, Countries, TimeSeriesItem
timeseries_url = 'https://pomber.github.io/covid19/timeseries.json'
def num(s):
try:
return int(s)
except ValueError:
return 0
TIMOUT_SEC = 3 * 60 * 60 # 3 hours in seconds
pref_country_persist = {}
def read_pref_country(user_id: int) -> Country:
country = pref_country_persist.get(user_id)
if country is None:
return Countries.US # default value
return Countries[country]
def write_pref_country(user_id: int, country: Countries):
pref_country_persist[user_id] = country.displayValue
def is_remote_file_changed(since_timestamp: int) -> bool:
r = requests.head(timeseries_url)
if r.status_code == requests.codes.ok:
url_time = r.headers['last-modified']
url_date = parsedate(url_time)
url_date_sec_epoch = int(time.mktime(url_date.timetuple()))
return url_date_sec_epoch > since_timestamp
return True # default changed
def get_formatted_datetime_change_data() -> str:
datetime_stamp = io_utils.read_pref_date()
return time.strftime('%b %d %Y %H:%M:%S %Z', time.gmtime(datetime_stamp))
def should_update_data() -> bool:
if os.path.exists(io_utils.get_timeseries_data_path()) is False: # no source data exists
return True
sec_now = int(time.time()) # current time
last_time_modification_sec = int(os.path.getmtime(io_utils.get_timeseries_data_path()))
timeout_expire_diff = sec_now - last_time_modification_sec # need to update date
if timeout_expire_diff < TIMOUT_SEC:
return False # file is already up to date
datetime_stamp = io_utils.read_pref_date()
is_remote_changed = is_remote_file_changed(datetime_stamp)
return is_remote_changed
def fetch_pomper_stat() -> Optional[dict]:
should_refresh = should_update_data()
if should_refresh is False:
# try to return cached data
if os.path.exists(io_utils.get_timeseries_data_path()):
with open(io_utils.get_timeseries_data_path()) as json_file:
data = json.load(json_file)
return data
req = requests.get(timeseries_url)
if req.status_code == requests.codes.ok:
# save datetime
date_timestamp = req.headers['last-modified']
url_date = parsedate(date_timestamp)
ts = time.mktime(url_date.timetuple())
io_utils.write_pref_date(int(ts))
json_data = req.json()
# save file
io_utils.write_timeseries_data(json_data)
return json_data # the response is a JSON
return None
def fetch_timeseries_report_deaths() -> Optional[list]:
return fetch_timeseries_report('time_series_covid19_deaths_global.csv')
def fetch_timeseries_report_recovered() -> Optional[list]:
return fetch_timeseries_report('time_series_covid19_recovered_global.csv')
def fetch_timeseries_report_confirmed() -> Optional[list]:
return fetch_timeseries_report('time_series_covid19_confirmed_global.csv')
def fetch_timeseries_report(file_name: str) -> Optional[list]:
url = f'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data' \
f'/csse_covid_19_time_series/{file_name}'
req = requests.get(url)
if req.status_code == requests.codes.ok:
csv_content = req.text
data_stats = []
csv_file = StringIO(csv_content)
csv_reader = csv.DictReader(csv_file)
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
print(f'Column names are {", ".join(row)}')
total = 0
dates_stat = defaultdict(int)
for key, value in row.items():
# print(key)
match = re.search(r'\d{1,2}/\d{1,2}/\d{2}', key)
if match is not None:
group = match.group().split("/")
year = 2000 + num(group[2])
month = num(group[0])
day = num(group[1])
dt = datetime(year=year, month=month, day=day)
# date = datetime.strptime(match.group(), '%-m/%d/%y').date()
print(dt)
date_str = dt.isoformat()
dates_stat[date_str] = num(value)
total += num(value)
country = row["Country/Region"]
state = row["Province/State"]
ts = TimeSeriesItem(state, country, dates_stat, total)
data_stats.append(ts)
# print(f'\t country: {row["Country/Region"]} {row["Province/State"]} ')
line_count += 1
print(f'Processed {line_count} lines.')
return data_stats
return None
def reformat_large_tick_values(tick_val, pos):
"""
Turns large tick values (in the billions, millions and thousands) such as 4500 into 4.5K and also appropriately turns 4000 into 4K (no zero after the decimal).
"""
if tick_val >= 1000000000:
val = round(tick_val / 1000000000, 1)
new_tick_format = '{:}B'.format(val)
elif tick_val >= 1000000:
val = round(tick_val / 1000000, 1)
new_tick_format = '{:}M'.format(val)
elif tick_val >= 1000:
val = round(tick_val / 1000, 1)
new_tick_format = '{:}K'.format(val)
elif tick_val < 1000:
new_tick_format = round(tick_val, 1)
else:
new_tick_format = tick_val
# make new_tick_format into a string value
new_tick_format = str(new_tick_format)
# code below will keep 4.5M as is but change values such as 4.0M to 4M since that zero after the decimal isn't needed
index_of_decimal = new_tick_format.find(".")
if index_of_decimal != -1:
value_after_decimal = new_tick_format[index_of_decimal + 1]
if value_after_decimal == "0":
# remove the 0 after the decimal point since it's not needed
new_tick_format = new_tick_format[0:index_of_decimal] + new_tick_format[index_of_decimal + 2:]
return new_tick_format