-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweather.py
289 lines (223 loc) · 8.79 KB
/
weather.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# weather: get the weather on the CLI
#
# Copyright (C) ja4e, Eason Qin, 2024.
#
# This source code form is licensed under the MIT/expat license. This program is provided
# on an AS IS BASIS with NO implications or expression of warranty. Please visit the
# OSI website for a full text of the license.
#
import asyncio
import python_weather
import time
import json
import pycountry
import os
from rapidfuzz import process, fuzz
user_entries = {}
CACHE_FILE = "weather_cache.json"
CACHE_TIMEOUT = 2000
country_dict = {
country.name.upper(): country.alpha_2 for country in pycountry.countries
}
def load_cache():
try:
with open(CACHE_FILE, "r") as f:
data = json.load(f)
# Remove expired cache entries
data["matches"] = {
loc: match
for loc, match in data.get("matches", {}).items()
if time.time() - data["timestamp"] < CACHE_TIMEOUT
}
return data
except (FileNotFoundError, json.JSONDecodeError):
return {"timestamp": time.time(), "matches": {}}
def save_cache(data):
data["timestamp"] = time.time()
with open(CACHE_FILE, "w") as f:
json.dump(data, f, indent=4)
def clear_cache():
if os.path.exists(CACHE_FILE):
os.remove(CACHE_FILE)
print("Cache cleared.")
else:
print("Cache file does not exist.")
def get_country_flag(country_code):
try:
flag = "".join([chr(0x1F1E6 + ord(c) - ord("A")) for c in country_code.upper()])
return flag
except Exception as e:
print(f"Error retrieving flag for {country_code}: {e}")
return ":question:"
def get_country_code(location_name):
location_name = location_name.upper()
matches = process.extract(
location_name, country_dict.keys(), scorer=fuzz.ratio, limit=15
)
exact_match = [match for match in matches if match[0] == location_name]
if exact_match:
matches = exact_match
else:
# Sort matches based on score (highest first)
matches = sorted(matches, key=lambda x: x[1], reverse=True)
cache = load_cache()
cache["matches"][location_name] = matches
save_cache(cache)
return matches
def exit_program():
print("Exiting the program.")
exit()
USER_ENTRIES_FILE = "user_entries.json"
def load_user_entries():
try:
with open(USER_ENTRIES_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_user_entries():
with open(USER_ENTRIES_FILE, "w") as f:
json.dump(user_entries, f, indent=4)
def clear_user_entries():
global user_entries
user_entries.clear()
print("User entries have been cleared.")
def get_location_choice(matches, location_name):
while True:
try:
choice = int(input(f"Please select the correct option (1-{len(matches) + 6}): "))
if 1 <= choice <= len(matches):
# The user has selected a valid match
selected_location = matches[choice - 1][0]
return selected_location
choice -= len(matches)
match choice:
case 1:
manual_entry = input("Please enter the correct location manually: ").upper()
country = input(f"Enter the country for {manual_entry}: ").upper()
if input(f"Do you want to save {manual_entry} as {country}? (y/n): ").lower() == "y":
user_entries[manual_entry] = country
return manual_entry
case 2:
manual_entry = input("Please enter the correct location manually: ").upper()
return manual_entry
case 3:
clear_cache()
return prompt_for_location(location_name)
case 4:
clear_user_entries()
return prompt_for_location(location_name)
case 5:
clear_cache()
clear_user_entries()
return prompt_for_location(location_name)
case 6:
exit_program()
case _:
print("Invalid choice. Please try again.")
except ValueError:
print("Invalid input. Please enter a number.")
def prompt_for_location(location_name):
cache = load_cache()
matches = cache.get("matches", {}).get(location_name)
if not matches:
matches = get_country_code(location_name)
if not matches:
print("No matches found.")
return location_name
if len(matches) == 1:
return matches[0][0]
print("\nAmbiguous location. Here are some possible matches:")
for i, item in enumerate(matches):
match, score, _ = item # This crap broke this update btw there you go
print(f"{i + 1}. {match} (Score: {score:.2f})")
print(f"{len(matches) + 1}. Manual Correction")
print(f"{len(matches) + 2}. Full Manual (not saved in dictionary)")
print(f"{len(matches) + 3}. Clear Cache")
print(f"{len(matches) + 4}. Clear User Entries")
print(f"{len(matches) + 5}. Clear All Cache")
print(f"{len(matches) + 6}. Exit")
try:
return get_location_choice(matches, location_name)
except ValueError:
print("Invalid input. Please enter a number.")
return prompt_for_location(location_name) # Recursively call if invalid input
def fahrenheit_to_celsius(fahrenheit):
return (fahrenheit - 32) / 1.8
def print_daily_forecast(daily):
high_temp_celsius = fahrenheit_to_celsius(daily.highest_temperature)
low_temp_celsius = fahrenheit_to_celsius(daily.lowest_temperature)
print(f" Date: {daily.date}")
print(
f" Highest Temperature: {daily.highest_temperature}°F ({high_temp_celsius:.1f}°C)"
)
print(
f" Lowest Temperature: {daily.lowest_temperature}°F ({low_temp_celsius:.1f}°C)"
)
print(f" Sunrise: {daily.sunrise}, Sunset: {daily.sunset}")
print(f" Moon Phase: {daily.moon_phase.emoji} ({daily.moon_phase.name})")
print(f" Snowfall: {daily.snowfall} inches")
print(" Hourly Forecast:")
for hourly in daily.hourly_forecasts:
time_formatted = hourly.time.strftime("%H:%M")
hourly_temp_celsius = fahrenheit_to_celsius(hourly.temperature)
print(
f" → {time_formatted}: {hourly.temperature}°F ({hourly_temp_celsius:.1f}°C), {hourly.description}"
)
print(
f" Wind Speed: {hourly.wind_speed} mph, {hourly.wind_direction.emoji} ({hourly.wind_direction.name})"
)
print(
f" Humidity: {hourly.humidity}%, UV Index: {hourly.ultraviolet.index} ({hourly.ultraviolet.name})"
)
print("-" * 40)
async def print_overview(location_name, weather):
current_temp_fahrenheit = weather.temperature
current_temp_celsius = fahrenheit_to_celsius(current_temp_fahrenheit)
country_code = country_dict.get(location_name.upper())
country_flag = (
get_country_flag(country_code) if country_code else "No flag available"
)
print(f"\n🌡️ Current Weather in {location_name}: {country_flag}")
print(f" Temperature: {current_temp_fahrenheit}°F ({current_temp_celsius:.1f}°C)")
print(f" Humidity: {weather.humidity}%")
print(
f" Wind Speed: {weather.wind_speed} mph, {weather.wind_direction.emoji} ({weather.wind_direction.name})"
)
print(f" UV Index: {weather.ultraviolet.index} ({weather.ultraviolet.name})\n")
async def get_weather_forecast(location_name):
async with python_weather.Client(unit=python_weather.IMPERIAL) as client:
print("📅 Daily Forecast:")
weather = await client.get(location_name)
await print_overview(location_name, weather)
daily_forecasts = list(weather.daily_forecasts)
for daily in daily_forecasts[:3]: # Get the first 3 days of forecasts
print_daily_forecast(daily)
def input_request():
location = input("Please enter the location for which you want the weather forecast: ").upper()
while True:
if __name__ == "__main__":
user_entries.update(load_user_entries())
print("clear cache input: 1")
print("clear user entries input: 2")
print("clear all cache input: 3")
print("Exit: 4")
location = input(
"Please enter the location for which you want the weather forecast: "
).upper()
if location== "1":
clear_cache()
elif location== "2":
clear_user_entries()
elif location== "3":
clear_cache()
clear_user_entries()
elif location== "4":
exit_program()
else:
location = prompt_for_location(location)
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(get_weather_forecast(location))
save_user_entries()
else:
exit_program()