-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
202 lines (179 loc) · 8.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# 1. input peak coord -> map area to search
# 2. download from database if needed
# 3. search the x,y grid of peak (within a radius of 1km) -> print out max value found
import itertools
import math
import numpy as np
from math import cos, pi, sqrt
from plotting import plot_multi_elevation
from dataset_utils import collect_heightmap
from kml_utils import parse_coord_input
from tqdm import tqdm
# from discovered peak, search area within radius
SEARCH_RADIUS_DEGREE_LIMIT = 0.30 # degree, ~ 30km
SEARCH_RADIUS_GRIDS_LIMIT = int(SEARCH_RADIUS_DEGREE_LIMIT * 3600) - 1 # 1 grid = 1/3600 degree
def calc_ngrids_above(collected: np.ndarray, peak_elevation, threshold):
"""
:return: None if collected area doesn't fit satisfying area;
0 if supplied threshold is below key col
Number of grids around peak higher than threshold. Each grid is one sqruare arc sec.
"""
data_radius = collected.shape[0] // 2
peak_x = data_radius
peak_y = data_radius
cut_edges = set() # (small_x,y, larger_x,y): have ends >= & < threshold, respectively
calc_area_grids = set() # (bottom-left corner x,y): has at least one corner >= threshold. x/y range: [0,2N)
discovered = {(peak_x, peak_y)} # those >= threshold
discovery_queue = [(peak_x, peak_y)] # those >= threshold
while discovery_queue:
node_x, node_y = discovery_queue.pop()
# if collected[node_x, node_y] > peak_elevation:
# print(f"Supplied threshold {threshold} is below peak col.")
# return None
for gx, gy in itertools.product([node_x, node_x - 1], [node_y, node_y - 1]):
if 0 <= gx < 2 * data_radius and \
0 <= gy < 2 * data_radius:
calc_area_grids.add((gx, gy))
for offset_x, offset_y in [(1, 0), (-1, 0), (0, 1), (0, -1)]:
neighbor_x = node_x + offset_x
neighbor_y = node_y + offset_y
if neighbor_x < 0 or neighbor_x >= 2 * data_radius + 1 or \
neighbor_y < 0 or neighbor_y >= 2 * data_radius + 1:
# print("Search area out of collected range!")
return 0
if collected[neighbor_x, neighbor_y] >= threshold:
if (neighbor_x, neighbor_y) not in discovered:
discovery_queue.append((neighbor_x, neighbor_y))
discovered.add((neighbor_x, neighbor_y))
else:
n1 = (node_x, node_y)
n2 = (neighbor_x, neighbor_y)
cut_edges.add((min(n1, n2), max(n1, n2)))
total_area = 0
face_color = np.zeros([2 * data_radius, 2 * data_radius, 3])
face_color[:, :, :] = [0, 1, 0]
for x, y in calc_area_grids:
local_edges = {((x, y), (x, y + 1)),
((x, y), (x + 1, y)),
((x, y + 1), (x + 1, y + 1)),
((x + 1, y), (x + 1, y + 1))}
local_cut_edges = local_edges & cut_edges
if len(local_cut_edges) == 0:
assert collected[x, y] >= threshold
assert collected[x + 1, y] >= threshold
assert collected[x, y + 1] >= threshold
assert collected[x + 1, y + 1] >= threshold
total_area += 1
face_color[x, y] = [1, 0, 0]
elif len(local_cut_edges) == 2:
(n1, n2), (n3, n4) = list(local_cut_edges)
n_set = {n1, n2, n3, n4}
if len(n_set) == 4:
# trapezoid
portion_12 = (max(collected[*n1], collected[*n2]) - threshold) / abs(collected[*n1] - collected[*n2])
portion_34 = (max(collected[*n3], collected[*n4]) - threshold) / abs(collected[*n3] - collected[*n4])
local_area = (portion_12 + portion_34) / 2
assert 0 <= local_area <= 1
total_area += local_area
elif len(n_set) == 3:
nc = {n1, n2} & {n3, n4} # find right angle vertex
na = {n1, n2} - nc
nb = {n3, n4} - nc
nc, na, nb = [list(x)[0] for x in (nc, na, nb)]
portion_ca = (max(collected[*nc], collected[*na]) - threshold) / abs(collected[*nc] - collected[*na])
portion_cb = (max(collected[*nc], collected[*nb]) - threshold) / abs(collected[*nc] - collected[*nb])
if collected[nc] >= threshold:
assert collected[na] < threshold
assert collected[nb] < threshold
# triangle
local_area = portion_ca * portion_cb / 2
assert 0 <= local_area <= 1 / 2
else:
assert collected[na] >= threshold
assert collected[nb] >= threshold
# rect - triangle
local_area = 1 - (1 - portion_ca) * (1 - portion_cb) / 2
assert 1 / 2 <= local_area <= 1
total_area += local_area
else:
assert False
face_color[x, y] = [0, 0, 1]
elif len(local_cut_edges) == 4:
# print(" Info: 4 cut edge, rare case. Noise in data.")
total_area += 1 / 2
face_color[x, y] = [1, 0, 1]
else:
assert False, f"grid-of-interest at {(x, y)} can only border even cut edges, but got {len(local_cut_edges)}"
plot_data = (collected, face_color, f"ArcSec^2 above {threshold}: {total_area:.1f}")
return total_area, plot_data
def analysis_peak(rough_lat, rough_lon, true_elevation=None,
angle_threshold=0.194, discover_radius_degree=0.02):
# from supplied lat, lon, discover peak:= highest point within discover_radius
discover_radius_grids = int(discover_radius_degree * 3600) - 1
discover_area = collect_heightmap(rough_lat, rough_lon, discover_radius_grids)
peak_elevation = np.max(discover_area)
if true_elevation is not None and \
(peak_elevation > true_elevation * 1.01 or peak_elevation < true_elevation * 0.95):
print(f"Error: Collected peak elevation ({peak_elevation}) is far from truth value ({true_elevation})")
return peak_elevation, []
xs, ys = np.where(discover_area == peak_elevation)
peak_x = xs[0]
peak_y = ys[0]
if not discover_radius_grids * 0.1 < peak_x < discover_radius_grids * 1.9 or \
not discover_radius_grids * 0.1 < peak_y < discover_radius_grids * 1.9:
print("Warning: peak found at the border of discover area. Please double check lat/lon.")
return peak_elevation, []
offset_row = discover_radius_grids - peak_x
offset_col = discover_radius_grids - peak_y
lat = rough_lat + offset_row / 3600
lon = rough_lon - offset_col / 3600
print(true_elevation, ":Use peak elevation:", peak_elevation, "at", lat, lon)
search_radius = 150 # start from 0.04 arc second
height_below = 300
search_step = 200
collected = collect_heightmap(lat, lon, search_radius)
plots = []
results = []
while height_below < peak_elevation and search_radius < SEARCH_RADIUS_GRIDS_LIMIT:
threshold = peak_elevation - height_below
result = calc_ngrids_above(collected, peak_elevation, threshold)
if result is None:
break
if result == 0:
search_radius += 100
collected = collect_heightmap(lat, lon, search_radius)
# print(" Increase search radius to", search_radius)
continue
total_area, plot_data = result
plots.append(plot_data)
meters_per_grid = 30.92 * cos(lat * pi / 180)
total_area *= pow(meters_per_grid, 2)
slope = height_below / sqrt(total_area / pi)
angle = math.atan(slope) / (math.pi / 2)
if height_below >= 1000 and angle < angle_threshold:
break
results.append((height_below, angle))
height_below += search_step
# print("Generating plots...")
# plot_multi_elevation(plots, show=True)
return peak_elevation, results
if __name__ == '__main__':
# single-peak analysis for Mount Everest
# analysis_peak(27.99, 86.92, true_elevation=8849)
# Analyze all peaks in the dataset
places = parse_coord_input("data\\Combined.csv")
results = []
for name, elevation, coords in tqdm(places):
lat, lon = coords
observed_elevation, peak_result = analysis_peak(lat, lon, elevation)
if peak_result:
eth = max([r[0] * r[1] for r in peak_result])
else:
eth = 0
slope_line = ','.join(str(sine) for drop, sine in peak_result)
results.append(f"{name},{eth},{lat},{lon},{elevation},{observed_elevation},{slope_line}\n")
with open('output.csv', 'w') as f:
drop_range = list(range(300, 3500, 200))
drop_headers = ','.join(str(d) for d in drop_range)
f.write(f'Peak,ETH,Latitude,Longitude,TrueElevation,ObservedElevation,{drop_headers}\n')
f.writelines(results)