Skip to content

Commit 02528a4

Browse files
authored
Add program for indexing PDS/ODC data from S3
1 parent d911484 commit 02528a4

File tree

1 file changed

+293
-0
lines changed

1 file changed

+293
-0
lines changed

scripts/index_from_s3_bucket.py

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
2+
# coding: utf-8
3+
from xml.etree import ElementTree
4+
from pathlib import Path
5+
import os
6+
from osgeo import osr
7+
import dateutil
8+
from dateutil import parser
9+
from datetime import timedelta
10+
import uuid
11+
import yaml
12+
import logging
13+
import click
14+
import re
15+
import boto3
16+
import datacube
17+
from datacube.index.hl import Doc2Dataset
18+
from datacube.utils import changes
19+
from ruamel.yaml import YAML
20+
21+
from multiprocessing import Process, current_process, Queue, Manager, cpu_count
22+
from time import sleep, time
23+
24+
GUARDIAN = "GUARDIAN_QUEUE_EMPTY"
25+
AWS_PDS_TXT_SUFFIX = "MTL.txt"
26+
27+
28+
MTL_PAIRS_RE = re.compile(r'(\w+)\s=\s(.*)')
29+
30+
bands_ls8 = [('1', 'coastal_aerosol'),
31+
('2', 'blue'),
32+
('3', 'green'),
33+
('4', 'red'),
34+
('5', 'nir'),
35+
('6', 'swir1'),
36+
('7', 'swir2'),
37+
('8', 'panchromatic'),
38+
('9', 'cirrus'),
39+
('10', 'lwir1'),
40+
('11', 'lwir2'),
41+
('QUALITY', 'quality')]
42+
43+
bands_ls7 = [('1', 'blue'),
44+
('2', 'green'),
45+
('3', 'red'),
46+
('4', 'nir'),
47+
('5', 'swir1'),
48+
('7', 'swir2'),
49+
('QUALITY', 'quality')]
50+
51+
52+
def _parse_value(s):
53+
s = s.strip('"')
54+
for parser in [int, float]:
55+
try:
56+
return parser(s)
57+
except ValueError:
58+
pass
59+
return s
60+
61+
62+
def _parse_group(lines):
63+
tree = {}
64+
for line in lines:
65+
match = MTL_PAIRS_RE.findall(line)
66+
if match:
67+
key, value = match[0]
68+
if key == 'GROUP':
69+
tree[value] = _parse_group(lines)
70+
elif key == 'END_GROUP':
71+
break
72+
else:
73+
tree[key] = _parse_value(value)
74+
return tree
75+
76+
77+
def get_geo_ref_points(info):
78+
return {
79+
'ul': {'x': info['CORNER_UL_PROJECTION_X_PRODUCT'], 'y': info['CORNER_UL_PROJECTION_Y_PRODUCT']},
80+
'ur': {'x': info['CORNER_UR_PROJECTION_X_PRODUCT'], 'y': info['CORNER_UR_PROJECTION_Y_PRODUCT']},
81+
'll': {'x': info['CORNER_LL_PROJECTION_X_PRODUCT'], 'y': info['CORNER_LL_PROJECTION_Y_PRODUCT']},
82+
'lr': {'x': info['CORNER_LR_PROJECTION_X_PRODUCT'], 'y': info['CORNER_LR_PROJECTION_Y_PRODUCT']},
83+
}
84+
85+
86+
def get_coords(geo_ref_points, spatial_ref):
87+
t = osr.CoordinateTransformation(spatial_ref, spatial_ref.CloneGeogCS())
88+
89+
def transform(p):
90+
lon, lat, z = t.TransformPoint(p['x'], p['y'])
91+
return {'lon': lon, 'lat': lat}
92+
93+
return {key: transform(p) for key, p in geo_ref_points.items()}
94+
95+
96+
def satellite_ref(sat):
97+
"""
98+
To load the band_names for referencing either LANDSAT8 or LANDSAT7 bands
99+
"""
100+
if sat == 'LANDSAT_8':
101+
sat_img = bands_ls8
102+
elif sat == 'LANDSAT_7' or sat == 'LANDSAT_5':
103+
sat_img = bands_ls7
104+
else:
105+
raise ValueError('Satellite data Not Supported')
106+
return sat_img
107+
108+
109+
def format_obj_key(obj_key):
110+
obj_key = '/'.join(obj_key.split("/")[:-1])
111+
return obj_key
112+
113+
114+
def get_s3_url(bucket_name, obj_key):
115+
return 'http://{bucket_name}.s3.amazonaws.com/{obj_key}'.format(
116+
bucket_name=bucket_name, obj_key=obj_key)
117+
118+
119+
def absolutify_paths(doc, bucket_name, obj_key):
120+
objt_key = format_obj_key(obj_key)
121+
for band in doc['image']['bands'].values():
122+
band['path'] = get_s3_url(bucket_name, objt_key + '/'+band['path'])
123+
return doc
124+
125+
126+
def make_metadata_doc(mtl_data, bucket_name, object_key):
127+
mtl_product_info = mtl_data['PRODUCT_METADATA']
128+
mtl_metadata_info = mtl_data['METADATA_FILE_INFO']
129+
satellite = mtl_product_info['SPACECRAFT_ID']
130+
instrument = mtl_product_info['SENSOR_ID']
131+
acquisition_date = mtl_product_info['DATE_ACQUIRED']
132+
scene_center_time = mtl_product_info['SCENE_CENTER_TIME']
133+
level = mtl_product_info['DATA_TYPE']
134+
product_type = 'L1TP'
135+
sensing_time = acquisition_date + ' ' + scene_center_time
136+
cs_code = 32600 + mtl_data['PROJECTION_PARAMETERS']['UTM_ZONE']
137+
label = mtl_metadata_info['LANDSAT_SCENE_ID']
138+
spatial_ref = osr.SpatialReference()
139+
spatial_ref.ImportFromEPSG(cs_code)
140+
geo_ref_points = get_geo_ref_points(mtl_product_info)
141+
coordinates = get_coords(geo_ref_points, spatial_ref)
142+
bands = satellite_ref(satellite)
143+
doc = {
144+
'id': str(uuid.uuid5(uuid.NAMESPACE_URL, get_s3_url(bucket_name, object_key))),
145+
'processing_level': level,
146+
'product_type': product_type,
147+
'creation_dt': str(acquisition_date),
148+
'label': label,
149+
'platform': {'code': satellite},
150+
'instrument': {'name': instrument},
151+
'extent': {
152+
'from_dt': sensing_time,
153+
'to_dt': sensing_time,
154+
'center_dt': sensing_time,
155+
'coord': coordinates,
156+
},
157+
'format': {'name': 'GeoTiff'},
158+
'grid_spatial': {
159+
'projection': {
160+
'geo_ref_points': geo_ref_points,
161+
'spatial_reference': 'EPSG:%s' % cs_code,
162+
}
163+
},
164+
'image': {
165+
'bands': {
166+
band[1]: {
167+
'path': mtl_product_info['FILE_NAME_BAND_' + band[0]],
168+
'layer': 1,
169+
} for band in bands
170+
}
171+
},
172+
'lineage': {'source_datasets': {}},
173+
}
174+
doc = absolutify_paths(doc, bucket_name, object_key)
175+
return doc
176+
177+
178+
def format_obj_key(obj_key):
179+
obj_key ='/'.join(obj_key.split("/")[:-1])
180+
return obj_key
181+
182+
183+
def get_s3_url(bucket_name, obj_key):
184+
return 's3://{bucket_name}/{obj_key}'.format(
185+
bucket_name=bucket_name, obj_key=obj_key)
186+
187+
def archive_dataset(doc, uri, index, sources_policy):
188+
def get_ids(dataset):
189+
ds = index.datasets.get(dataset.id, include_sources=True)
190+
for source in ds.sources.values():
191+
yield source.id
192+
yield dataset.id
193+
194+
195+
resolver = Doc2Dataset(index)
196+
dataset, err = resolver(doc, uri)
197+
index.datasets.archive(get_ids(dataset))
198+
logging.info("Archiving %s and all sources of %s", dataset.id, dataset.id)
199+
200+
201+
def add_dataset(doc, uri, index, sources_policy):
202+
logging.info("Indexing %s", uri)
203+
resolver = Doc2Dataset(index)
204+
dataset, err = resolver(doc, uri)
205+
if err is not None:
206+
logging.error("%s", err)
207+
try:
208+
index.datasets.add(dataset, sources_policy=sources_policy) # Source policy to be checked in sentinel 2 datase types
209+
except changes.DocumentMismatchError as e:
210+
index.datasets.update(dataset, {tuple(): changes.allow_any})
211+
except Exception as e:
212+
logging.error("Unhandled exception %s", e)
213+
214+
return uri
215+
216+
217+
def worker(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, queue):
218+
dc=datacube.Datacube(config=config)
219+
index = dc.index
220+
s3 = boto3.resource("s3")
221+
safety = 'safe' if not unsafe else 'unsafe'
222+
223+
while True:
224+
key = queue.get(timeout=60)
225+
if key == GUARDIAN:
226+
break
227+
logging.info("Processing %s %s", key, current_process())
228+
obj = s3.Object(bucket_name, key).get(ResponseCacheControl='no-cache')
229+
raw = obj['Body'].read()
230+
if suffix == AWS_PDS_TXT_SUFFIX:
231+
# Attempt to process text document
232+
raw_string = raw.decode('utf8')
233+
txt_doc = _parse_group(iter(raw_string.split("\n")))['L1_METADATA_FILE']
234+
data = make_metadata_doc(txt_doc, bucket_name, key)
235+
else:
236+
yaml = YAML(typ=safety, pure=False)
237+
yaml.default_flow_style = False
238+
data = yaml.load(raw)
239+
uri= get_s3_url(bucket_name, key)
240+
logging.info("calling %s", func)
241+
func(data, uri, index, sources_policy)
242+
queue.task_done()
243+
244+
245+
def iterate_datasets(bucket_name, config, prefix, suffix, func, unsafe, sources_policy):
246+
manager = Manager()
247+
queue = manager.Queue()
248+
249+
s3 = boto3.resource('s3')
250+
bucket = s3.Bucket(bucket_name)
251+
logging.info("Bucket : %s prefix: %s ", bucket_name, str(prefix))
252+
safety = 'safe' if not unsafe else 'unsafe'
253+
worker_count = cpu_count() * 2
254+
255+
processess = []
256+
for i in range(worker_count):
257+
proc = Process(target=worker, args=(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, queue,))
258+
processess.append(proc)
259+
proc.start()
260+
261+
for obj in bucket.objects.filter(Prefix = str(prefix)):
262+
if (obj.key.endswith(suffix)):
263+
queue.put(obj.key)
264+
265+
queue.join()
266+
267+
for i in range(worker_count):
268+
queue.put(GUARDIAN)
269+
270+
for proc in processess:
271+
proc.join()
272+
273+
274+
275+
@click.command(help= "Enter Bucket name. Optional to enter configuration file to access a different database")
276+
@click.argument('bucket_name')
277+
@click.option('--config','-c',help=" Pass the configuration file to access the database",
278+
type=click.Path(exists=True))
279+
@click.option('--prefix', '-p', help="Pass the prefix of the object to the bucket")
280+
@click.option('--suffix', '-s', default=".yaml", help="Defines the suffix of the metadata_docs that will be used to load datasets. For AWS PDS bucket use MTL.txt")
281+
@click.option('--archive', is_flag=True, help="If true, datasets found in the specified bucket and prefix will be archived")
282+
@click.option('--unsafe', is_flag=True, help="If true, YAML will be parsed unsafely. Only use on trusted datasets. Only valid if suffix is yaml")
283+
@click.option('--sources_policy', default="verify", help="verify, ensure, skip")
284+
def main(bucket_name, config, prefix, suffix, archive, unsafe, sources_policy):
285+
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
286+
action = archive_dataset if archive else add_dataset
287+
iterate_datasets(bucket_name, config, prefix, suffix, action, unsafe, sources_policy)
288+
289+
290+
if __name__ == "__main__":
291+
main()
292+
293+

0 commit comments

Comments
 (0)