Skip to content

Commit 0204f54

Browse files
authored
Merge pull request #268 from allenai/favyen/20251219-fix-vessel-pipeline
Fix Sentinel-2 vessel detection pipeline and run it for UPF.
2 parents 9dab2a5 + f841916 commit 0204f54

File tree

5 files changed

+310
-3
lines changed

5 files changed

+310
-3
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
This is for UPF request to get vessel detections in Africa.
2+
3+
First run script to get the 5x5 degree AOIs that are not on land:
4+
5+
```
6+
python one_off_projects/2025_12_africa_vessels/get_aois.py
7+
```
8+
9+
Then we can get scene IDs for each AOI, e.g.:
10+
11+
```
12+
python one_off_projects/2025_12_africa_vessels/get_scene_ids.py \
13+
--cache_path /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/ \
14+
--geojson /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/aoi_0_0_5_5.geojson \
15+
--out_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \
16+
--geom_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/aoi_0_0_5_5.geojson
17+
```
18+
19+
The `--out_fname` has a simple list of scene IDs compatible with
20+
`rslp.main sentinel2_vessels write_entries`, while `--geom_fname` has the detailed
21+
scene geometry that could be useful for UPF.
22+
23+
Here is batch version:
24+
25+
```python
26+
import multiprocessing
27+
import os
28+
import subprocess
29+
30+
import tqdm
31+
32+
def process(aoi_name: str) -> None:
33+
subprocess.call([
34+
"python",
35+
"one_off_projects/2025_12_africa_vessels/get_scene_ids.py",
36+
"--cache_path=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/",
37+
f"--geojson=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/{aoi_name}.geojson",
38+
f"--out_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{aoi_name}.json",
39+
f"--geom_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/{aoi_name}.geojson",
40+
])
41+
42+
aoi_names = [fname.split(".")[0] for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/")]
43+
p = multiprocessing.Pool(4)
44+
outputs = p.imap_unordered(process, aoi_names)
45+
for _ in tqdm.tqdm(outputs, total=len(aoi_names)):
46+
pass
47+
p.close()
48+
```
49+
50+
Write the jobs to queue:
51+
52+
```
53+
python -m rslp.main sentinel2_vessels write_entries \
54+
--queue_name favyen/sentinel2-vessels-predict \
55+
--json_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \
56+
--json_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/ \
57+
--geojson_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/ \
58+
--crop_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/
59+
```
60+
61+
Here is batch version:
62+
63+
```python
64+
import os
65+
import subprocess
66+
for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/"):
67+
label = fname.split(".")[0]
68+
subprocess.call([
69+
"python",
70+
"-m",
71+
"rslp.main",
72+
"sentinel2_vessels",
73+
"write_entries",
74+
"--queue_name",
75+
"favyen/sentinel2-vessels-predict",
76+
"--json_fname",
77+
f"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{label}.json",
78+
"--json_out_dir",
79+
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/",
80+
"--geojson_out_dir",
81+
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/",
82+
"--crop_out_dir",
83+
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/",
84+
])
85+
```
86+
87+
And launch worker jobs:
88+
89+
```
90+
python -m rslp.main common launch --image_name favyen/rslpomp20251212c --queue_name favyen/sentinel2-vessels-predict --num_workers 100 --gpus 1 --shared_memory 256GiB --cluster=[ai2/jupiter,ai2/neptune,ai2/saturn] --weka_mounts+='{"bucket_name": "dfive-default", "mount_path": "/weka/dfive-default"}'
91+
```
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Get 5x5 degree tiles that are not on land."""
2+
3+
import json
4+
from pathlib import Path
5+
6+
from global_land_mask import globe
7+
8+
AOI_DIR = "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/"
9+
GRID_SIZE = 5
10+
11+
if __name__ == "__main__":
12+
box = (-30, -40, 70, 35)
13+
14+
for lon in range(box[0], box[2], GRID_SIZE):
15+
for lat in range(box[1], box[3], GRID_SIZE):
16+
coordinates = [
17+
(lon, lat),
18+
(lon, lat + GRID_SIZE),
19+
(lon + GRID_SIZE, lat + GRID_SIZE),
20+
(lon + GRID_SIZE, lat),
21+
(lon, lat),
22+
]
23+
# Make sure at least one corner is in the ocean, otherwise we skip this tile.
24+
at_least_one_water = False
25+
for coord in coordinates:
26+
if globe.is_land(coord[1], coord[0]):
27+
continue
28+
at_least_one_water = True
29+
30+
print(lon, lat, at_least_one_water)
31+
32+
if not at_least_one_water:
33+
continue
34+
35+
fname = Path(AOI_DIR) / f"aoi_{lon}_{lat}_{lon+GRID_SIZE}_{lat+GRID_SIZE}.geojson"
36+
with fname.open("w") as f:
37+
feat = {
38+
"type": "Feature",
39+
"properties": {},
40+
"geometry": {
41+
"type": "Polygon",
42+
"coordinates": [coordinates],
43+
}
44+
}
45+
json.dump({
46+
"type": "FeatureCollection",
47+
"properties": {},
48+
"features": [feat],
49+
}, f)
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Get Sentinel-2 scene IDs that we should run vessel detection model on."""
2+
3+
import argparse
4+
import json
5+
import multiprocessing
6+
from datetime import datetime, timezone, UTC
7+
8+
import shapely
9+
import tqdm
10+
from rslearn.config import QueryConfig, SpaceMode
11+
from rslearn.const import WGS84_PROJECTION
12+
from rslearn.data_sources.gcp_public_data import Sentinel2, Sentinel2Item
13+
from rslearn.utils.geometry import STGeometry
14+
from rslearn.utils.vector_format import GeojsonVectorFormat, GeojsonCoordinateMode
15+
from rslearn.utils.feature import Feature
16+
from rslearn.utils.mp import star_imap_unordered
17+
from upath import UPath
18+
19+
20+
def split_aoi(aoi: STGeometry, size: int = 1) -> list[STGeometry]:
21+
"""Split up a big AOI into smaller geometries.
22+
23+
Args:
24+
aoi: the AOI to split up.
25+
size: the size for sub-tiles to create within the bounds of the AOI.
26+
27+
Returns:
28+
list of sub-tiles.
29+
"""
30+
# We assume the tile has integer lon/lat coordinates and are 5x5 degrees.
31+
bounds = tuple(int(v) for v in aoi.shp.bounds)
32+
assert (bounds[2] - bounds[0]) == 5
33+
assert (bounds[3] - bounds[1]) == 5
34+
35+
# Only size=1 really makes sense here since 5 has no larger factors besides 5.
36+
num_x_tiles = (bounds[2] - bounds[0]) // size
37+
num_y_tiles = (bounds[3] - bounds[1]) // size
38+
39+
geoms: list[STGeometry] = []
40+
for col in range(num_x_tiles):
41+
for row in range(num_y_tiles):
42+
x_start = bounds[0] + col
43+
y_start = bounds[1] + row
44+
geom = STGeometry(WGS84_PROJECTION, shapely.box(x_start, y_start, x_start + size, y_start + size), aoi.time_range)
45+
geoms.append(geom)
46+
47+
return geoms
48+
49+
50+
def get_items(geom: STGeometry, cache_path: UPath) -> list[Sentinel2Item]:
51+
"""Get the items matching the given geometry."""
52+
query_config = QueryConfig(space_mode=SpaceMode.INTERSECTS, max_matches=100000)
53+
sentinel2 = Sentinel2(
54+
index_cache_dir=cache_path, use_rtree_index=False, use_bigquery=True
55+
)
56+
item_groups = sentinel2.get_items([geom], query_config)[0]
57+
items = []
58+
for group in item_groups:
59+
if len(group) != 1:
60+
raise ValueError("expected each item group to have one item with INTERSECTS space mode")
61+
items.append(group[0])
62+
return items
63+
64+
65+
if __name__ == "__main__":
66+
multiprocessing.set_start_method("forkserver")
67+
68+
parser = argparse.ArgumentParser(
69+
description="Get Sentinel-2 scene IDs",
70+
)
71+
parser.add_argument(
72+
"--cache_path",
73+
type=str,
74+
help="Path to cache stuff",
75+
required=True,
76+
)
77+
parser.add_argument(
78+
"--geojson",
79+
type=str,
80+
help="GeoJSON filename containing the area of interest",
81+
required=True,
82+
)
83+
parser.add_argument(
84+
"--out_fname",
85+
type=str,
86+
help="Filename to write scene IDs",
87+
required=True,
88+
)
89+
parser.add_argument(
90+
"--geom_fname",
91+
type=str,
92+
help="Filename to write scene geometries",
93+
default=None,
94+
)
95+
args = parser.parse_args()
96+
97+
vector_format = GeojsonVectorFormat(coordinate_mode=GeojsonCoordinateMode.WGS84)
98+
features = vector_format.decode_from_file(UPath(args.geojson))
99+
assert len(features) == 1
100+
feat = features[0]
101+
102+
geom = STGeometry(
103+
feat.geometry.projection,
104+
feat.geometry.shp,
105+
(
106+
datetime(2016, 1, 1, tzinfo=UTC),
107+
datetime(2025, 1, 1, tzinfo=UTC),
108+
),
109+
)
110+
111+
# Split up the AOI.
112+
geoms = split_aoi(geom)
113+
print(f"Got {len(geoms)} sub-tiles")
114+
115+
# Process the AOIs in parallel.
116+
scene_ids = set()
117+
features: list[Feature] = []
118+
p = multiprocessing.Pool(64)
119+
outputs = star_imap_unordered(p, get_items, [dict(
120+
geom=geom,
121+
cache_path=UPath(args.cache_path)
122+
) for geom in geoms])
123+
for item_list in tqdm.tqdm(outputs, total=len(geoms)):
124+
for item in item_list:
125+
if item.name in scene_ids:
126+
continue
127+
scene_ids.add(item.name)
128+
feat = Feature(item.geometry, {
129+
"scene_id": item.name,
130+
})
131+
features.append(feat)
132+
133+
print(f"Got {len(scene_ids)} scene IDs after de-duplication")
134+
135+
with open(args.out_fname, "w") as f:
136+
json.dump(list(scene_ids), f)
137+
138+
if args.geom_fname:
139+
vector_format.encode_to_file(UPath(args.geom_fname), features)

rslp/common/worker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from rslp.utils.beaker import (
2424
DEFAULT_BUDGET,
2525
DEFAULT_WORKSPACE,
26+
WekaMount,
2627
create_gcp_credentials_mount,
2728
get_base_env_vars,
2829
)
@@ -131,6 +132,7 @@ def launch_workers(
131132
gpus: int = 0,
132133
shared_memory: str | None = None,
133134
priority: BeakerJobPriority = BeakerJobPriority.low,
135+
weka_mounts: list[WekaMount] = [],
134136
) -> None:
135137
"""Start workers for the prediction jobs.
136138
@@ -142,11 +144,15 @@ def launch_workers(
142144
gpus: number of GPUs to request per worker.
143145
shared_memory: shared memory string like "256GiB".
144146
priority: priority to assign the Beaker jobs.
147+
weka_mounts: list of weka mounts for Beaker job.
145148
"""
146149
with Beaker.from_env(default_workspace=DEFAULT_WORKSPACE) as beaker:
147150
for _ in tqdm.tqdm(range(num_workers)):
148151
env_vars = get_base_env_vars(use_weka_prefix=False)
149152

153+
datasets = [create_gcp_credentials_mount()]
154+
datasets += [weka_mount.to_data_mount() for weka_mount in weka_mounts]
155+
150156
spec = BeakerExperimentSpec.new(
151157
budget=DEFAULT_BUDGET,
152158
description="worker",
@@ -163,7 +169,7 @@ def launch_workers(
163169
cluster=cluster,
164170
),
165171
preemptible=True,
166-
datasets=[create_gcp_credentials_mount()],
172+
datasets=datasets,
167173
env_vars=env_vars,
168174
resources=BeakerTaskResources(
169175
gpu_count=gpus, shared_memory=shared_memory

rslp/sentinel2_vessels/predict_pipeline.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@
4949
SENTINEL2_RESOLUTION = 10
5050
CROP_WINDOW_SIZE = 128
5151

52+
# Use lower number of data loader workers for prediction since each worker will read a
53+
# big scene (unlike the small windows used during training).
54+
NUM_DATA_LOADER_WORKERS = 4
55+
56+
# We make sure the windows we create for Sentinel-2 scenes are multiples of this amount
57+
# because we store some bands at 1/4 of the input resolution, so the window size needs
58+
# be a multiple of 4.
59+
WINDOW_MIN_MULTIPLE = 4
60+
5261
# Distance threshold for near marine infrastructure filter in km.
5362
# 0.05 km = 50 m
5463
INFRA_DISTANCE_THRESHOLD = 0.05
@@ -265,12 +274,21 @@ def get_vessel_detections(
265274
windows: list[Window] = []
266275
group = "detector_predict"
267276
for scene_idx, scene_data in enumerate(scene_datas):
277+
# Pad the bounds so they are multiple of WINDOW_MIN_MULTIPLE.
278+
padded_bounds = (
279+
(scene_data.bounds[0] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE,
280+
(scene_data.bounds[1] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE,
281+
((scene_data.bounds[2] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE)
282+
* WINDOW_MIN_MULTIPLE,
283+
((scene_data.bounds[3] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE)
284+
* WINDOW_MIN_MULTIPLE,
285+
)
268286
window = Window(
269287
storage=dataset.storage,
270288
group=group,
271289
name=str(scene_idx),
272290
projection=scene_data.projection,
273-
bounds=scene_data.bounds,
291+
bounds=padded_bounds,
274292
time_range=scene_data.time_range,
275293
)
276294
window.save()
@@ -305,7 +323,11 @@ def get_vessel_detections(
305323

306324
# Run object detector.
307325
with time_operation(TimerOperations.RunModelPredict):
308-
run_model_predict(DETECT_MODEL_CONFIG, ds_path)
326+
run_model_predict(
327+
DETECT_MODEL_CONFIG,
328+
ds_path,
329+
extra_args=["--data.init_args.num_workers", str(NUM_DATA_LOADER_WORKERS)],
330+
)
309331

310332
# Read the detections.
311333
detections: list[VesselDetection] = []

0 commit comments

Comments
 (0)