|
20 | 20 |
|
21 | 21 | from multiprocessing import Process, current_process, Queue, Manager, cpu_count |
22 | 22 | from time import sleep, time |
| 23 | +from queue import Empty |
23 | 24 |
|
24 | 25 | GUARDIAN = "GUARDIAN_QUEUE_EMPTY" |
25 | 26 | AWS_PDS_TXT_SUFFIX = "MTL.txt" |
@@ -184,7 +185,8 @@ def get_s3_url(bucket_name, obj_key): |
184 | 185 | return 's3://{bucket_name}/{obj_key}'.format( |
185 | 186 | bucket_name=bucket_name, obj_key=obj_key) |
186 | 187 |
|
187 | | -def archive_dataset(doc, uri, index, sources_policy): |
| 188 | + |
| 189 | +def archive_document(doc, uri, index, sources_policy): |
188 | 190 | def get_ids(dataset): |
189 | 191 | ds = index.datasets.get(dataset.id, include_sources=True) |
190 | 192 | for source in ds.sources.values(): |
@@ -221,25 +223,29 @@ def worker(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, qu |
221 | 223 | safety = 'safe' if not unsafe else 'unsafe' |
222 | 224 |
|
223 | 225 | while True: |
224 | | - key = queue.get(timeout=60) |
225 | | - if key == GUARDIAN: |
| 226 | + try: |
| 227 | + key = queue.get(timeout=60) |
| 228 | + if key == GUARDIAN: |
| 229 | + queue.task_done() |
| 230 | + break |
| 231 | + logging.info("Processing %s %s", key, current_process()) |
| 232 | + obj = s3.Object(bucket_name, key).get(ResponseCacheControl='no-cache') |
| 233 | + raw = obj['Body'].read() |
| 234 | + if suffix == AWS_PDS_TXT_SUFFIX: |
| 235 | + # Attempt to process text document |
| 236 | + raw_string = raw.decode('utf8') |
| 237 | + txt_doc = _parse_group(iter(raw_string.split("\n")))['L1_METADATA_FILE'] |
| 238 | + data = make_metadata_doc(txt_doc, bucket_name, key) |
| 239 | + else: |
| 240 | + yaml = YAML(typ=safety, pure=False) |
| 241 | + yaml.default_flow_style = False |
| 242 | + data = yaml.load(raw) |
| 243 | + uri= get_s3_url(bucket_name, key) |
| 244 | + logging.info("calling %s", func) |
| 245 | + func(data, uri, index, sources_policy) |
| 246 | + queue.task_done() |
| 247 | + except Empty: |
226 | 248 | break |
227 | | - logging.info("Processing %s %s", key, current_process()) |
228 | | - obj = s3.Object(bucket_name, key).get(ResponseCacheControl='no-cache') |
229 | | - raw = obj['Body'].read() |
230 | | - if suffix == AWS_PDS_TXT_SUFFIX: |
231 | | - # Attempt to process text document |
232 | | - raw_string = raw.decode('utf8') |
233 | | - txt_doc = _parse_group(iter(raw_string.split("\n")))['L1_METADATA_FILE'] |
234 | | - data = make_metadata_doc(txt_doc, bucket_name, key) |
235 | | - else: |
236 | | - yaml = YAML(typ=safety, pure=False) |
237 | | - yaml.default_flow_style = False |
238 | | - data = yaml.load(raw) |
239 | | - uri= get_s3_url(bucket_name, key) |
240 | | - logging.info("calling %s", func) |
241 | | - func(data, uri, index, sources_policy) |
242 | | - queue.task_done() |
243 | 249 |
|
244 | 250 |
|
245 | 251 | def iterate_datasets(bucket_name, config, prefix, suffix, func, unsafe, sources_policy): |
@@ -283,7 +289,7 @@ def iterate_datasets(bucket_name, config, prefix, suffix, func, unsafe, sources_ |
283 | 289 | @click.option('--sources_policy', default="verify", help="verify, ensure, skip") |
284 | 290 | def main(bucket_name, config, prefix, suffix, archive, unsafe, sources_policy): |
285 | 291 | logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) |
286 | | - action = archive_dataset if archive else add_dataset |
| 292 | + action = archive_document if archive else add_dataset |
287 | 293 | iterate_datasets(bucket_name, config, prefix, suffix, action, unsafe, sources_policy) |
288 | 294 |
|
289 | 295 |
|
|
0 commit comments