-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathswath_geometry.py
298 lines (230 loc) · 10.5 KB
/
swath_geometry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
""" The module contains functions designed to calculate and retrieve the
extents and resolution of data in the projected Coordinate Reference System
(CRS).
"""
import functools
from typing import List, Tuple
import numpy as np
from netCDF4 import Variable
from pyproj import Proj
def get_projected_resolution(
projection: Proj, longitudes: np.ma.MaskedArray, latitudes: np.ma.MaskedArray
) -> Tuple[float]:
"""Find the resolution of the target grid in the projected coordinates, x
and y. First the perimeter points are found. These are then projected
to the target CRS. Gauss' Area formula is then applied to find the area
of the swath in the target CRS. This is assumed to be equally shared
between input pixels. The pixels are also assumed to be square.
"""
coordinates_mask = get_valid_coordinates_mask(longitudes, latitudes)
x_values, y_values = get_projected_coordinates(
coordinates_mask, projection, longitudes, latitudes
)
if len(longitudes.shape) == 1:
absolute_resolution = get_one_dimensional_resolution(x_values, y_values)
else:
ordered_x, ordered_y = sort_perimeter_points(x_values, y_values)
projected_area = get_polygon_area(ordered_x, ordered_y)
absolute_resolution = get_absolute_resolution(
projected_area, coordinates_mask.count() # pylint: disable=E1101
)
return absolute_resolution
def get_extents_from_perimeter(
projection: Proj, longitudes: np.ma.MaskedArray, latitudes: np.ma.MaskedArray
) -> Tuple[float]:
"""Find the swath extents in the target CRS. First the perimeter points of
unfilled valid pixels are found. These are then projected to the target
CRS. Finally the minimum and maximum values in the projected x and y
coordinates are returned.
"""
coordinates_mask = get_valid_coordinates_mask(longitudes, latitudes)
x_values, y_values = get_projected_coordinates(
coordinates_mask, projection, longitudes, latitudes
)
return (np.min(x_values), np.max(x_values), np.min(y_values), np.max(y_values))
def get_projected_coordinates(
coordinates_mask: np.ma.core.MaskedArray,
projection: Proj,
longitudes: np.ma.MaskedArray,
latitudes: np.ma.MaskedArray,
) -> Tuple[np.ndarray]:
"""Get the required coordinate points projected in the target Coordinate
Reference System (CRS).
"""
if len(longitudes.shape) == 1:
coordinates = get_all_coordinates(longitudes, latitudes, coordinates_mask)
else:
coordinates = get_perimeter_coordinates(longitudes, latitudes, coordinates_mask)
return reproject_coordinates(coordinates, projection)
def reproject_coordinates(points: List[Tuple], projection: Proj) -> Tuple[np.ndarray]:
"""Reproject a list of input perimeter points, in longitude and latitude
tuples, to the target CRS.
Returns:
x: numpy.ndarray of projected x coordinates.
y: numpy.ndarray of projected y coordinates.
"""
perimeter_longitudes, perimeter_latitudes = zip(*points)
return projection(perimeter_longitudes, perimeter_latitudes)
def get_one_dimensional_resolution(
x_values: List[float], y_values: List[float]
) -> float:
"""Find the projected distance between each pair of consecutive points
and return the median average as the resolution.
"""
return np.median(
[
euclidean_distance(
x_values[ind + 1], x_values[ind], y_values[ind + 1], y_values[ind]
)
for ind in range(len(x_values) - 2)
]
)
def euclidean_distance(x_one: float, x_two: float, y_one: float, y_two: float) -> float:
"""Find the Euclidean distance between two points, in projected coordinate
space.
"""
return np.sqrt((x_one - x_two) ** 2.0 + (y_one - y_two) ** 2.0)
def get_polygon_area(x_values: np.ndarray, y_values: np.ndarray) -> float:
"""Use the Gauss' Area Formula (a.k.a. Shoelace Formula) to calculate the
area of the input swath from its perimeter points. These points must
be sorted so consecutive points along the perimeter are consecutive in
the input lists.
"""
return 0.5 * np.abs(
np.dot(x_values, np.roll(y_values, 1)) - np.dot(y_values, np.roll(x_values, 1))
)
def get_absolute_resolution(polygon_area: float, n_pixels: int) -> float:
"""Find the absolute value of the resolution of the target CRS. This
assumes that all pixels are equal in area, and that they are square.
"""
return np.sqrt(np.divide(polygon_area, n_pixels))
def get_valid_coordinates_mask(
longitudes: np.ma.MaskedArray, latitudes: np.ma.MaskedArray
) -> np.ma.core.MaskedArray:
"""Get a `numpy` N-d array containing boolean values (0 or 1) indicating
whether the elements of both longitude and latitude are valid at that
location. Validity of these elements means that an element must not be
a fill value, or contain a NaN. Note, a value of 1 means that the pixel
contains valid data.
When a `netCDF4.Variable` is loaded, the data will automatically be
read as a `numpy.ma.core.MaskedArray`. Values matching the `_FillValue`
as stored in the variable metadata will be masked.
"""
valid_longitudes = np.logical_and(
np.isfinite(longitudes), np.logical_not(longitudes[:].mask)
)
valid_latitudes = np.logical_and(
np.isfinite(latitudes), np.logical_not(latitudes[:].mask)
)
condition = np.logical_and(valid_longitudes, valid_latitudes)
return np.ma.masked_where(np.logical_not(condition), np.ones(longitudes.shape))
def get_perimeter_coordinates(
longitudes: np.ndarray, latitudes: np.ndarray, mask: np.ma.core.MaskedArray
) -> List[Tuple[float]]:
"""Get the coordinates for all pixels in the input grid with non-fill,
non-NaN values for both longitude and latitude. Note, these points will
be in a random order, due to the use of the Python Set class.
"""
row_points = {
point
for row_index, row in enumerate(mask)
if row.any()
for point in get_slice_edges(row.nonzero()[0], row_index)
}
column_points = {
point
for column_index, column in enumerate(mask.T)
if column.any()
for point in get_slice_edges(column.nonzero()[0], column_index, is_row=False)
}
unordered_points = row_points.union(column_points)
if swath_crosses_international_date_line(longitudes):
# The International Date Line is between two pixel columns.
if np.median(longitudes) < 0:
# Most pixels are in the Western Hemisphere.
longitudes[longitudes > 0] -= 360.0
else:
# Most pixels are in the Eastern Hemisphere.
longitudes[longitudes < 0] += 360.0
return [
(longitudes[point[0], point[1]], latitudes[point[0], point[1]])
for point in unordered_points
]
def get_all_coordinates(
longitudes: np.ndarray, latitudes: np.ndarray, mask: np.ma.core.MaskedArray
) -> List[Tuple[float]]:
"""Return coordinates of all valid pixels in (longitude, latitude) tuples.
These points will have non-fill values for both the longitude and
latitude, and are expected to be from a 1-D variable.
"""
return [
(longitude, latitudes[array_index])
for array_index, longitude in enumerate(longitudes)
if mask[array_index]
]
def get_slice_edges(
slice_valid_indices: np.ndarray, slice_index: int, is_row: bool = True
) -> List[Tuple[int]]:
"""Given a list of indices for all valid data in a single row or column of
an array, return the 2-dimensional indices of the first and last pixels
with valid data. This function relies of the `nonzero` method of a
`numpy` masked array returning array indices in ascending order.
"""
if is_row:
slice_edges = [
(slice_index, slice_valid_indices[0]),
(slice_index, slice_valid_indices[-1]),
]
else:
slice_edges = [
(slice_valid_indices[0], slice_index),
(slice_valid_indices[-1], slice_index),
]
return slice_edges
def sort_perimeter_points(
unordered_x: np.ndarray, unordered_y: np.ndarray
) -> Tuple[np.ndarray]:
"""Take arrays of x and y projected coordinates, combine into coordinate
pairs, then order them clockwise starting from the point nearest to a
reference vector originating at the polygon centroid. Finally, return
ordered arrays of the x and y coordinates separated once more.
"""
unordered_points = np.array(list(zip(unordered_x, unordered_y)))
polygon_centroid = np.mean(unordered_points, axis=0)
sort_key = functools.partial(clockwise_point_sort, polygon_centroid)
ordered_points = sorted(unordered_points, key=sort_key)
return zip(*ordered_points)
def clockwise_point_sort(origin: List[float], point: List[float]) -> Tuple[float]:
"""A key function to be used with the internal Python sorted function.
This function should return a tuple of the clockwise angle and length
between the point and a reference vector, which is a vertical unit
vector. The origin argument should be the within the polygon for which
perimeter points are being sorted, to ensure correct ordering. For
simplicity, it is assumed this origin is the centroid of the polygon.
See:
- https://stackoverflow.com/a/41856340
- https://stackoverflow.com/a/35134034
"""
reference_vector = np.array([0, 1])
vector = np.subtract(np.array(point), np.array(origin))
vector_length = np.linalg.norm(vector)
if vector_length == 0:
# The closest point is identical
vector_angle = -np.pi
else:
normalised_vector = np.divide(vector, vector_length)
dot_product = np.dot(normalised_vector, reference_vector)
determinant = np.linalg.det([normalised_vector, reference_vector])
vector_angle = np.math.atan2(determinant, dot_product)
return vector_angle, vector_length
def swath_crosses_international_date_line(longitudes: np.ndarray) -> bool:
"""Check if swath begins west of the International Date Line and ends to
the east of it. In this case there should be a discontinuity between
either two adjacent longitude columns or rows.
"""
longitudes_difference_row = np.diff(longitudes, n=1, axis=0)
dim_differences = [np.max(np.abs(longitudes_difference_row))]
if len(longitudes.shape) > 1:
longitudes_difference_column = np.diff(longitudes, n=1, axis=1)
dim_differences.append(np.max(np.abs(longitudes_difference_column)))
return np.max(dim_differences) > 90.0