Skip to content

Commit b89b2f3

Browse files
author
Robert Armstrong
committed
s3 bucket indexing returns error encountered in add_dataset. Does not attempt to join queue in parent process
1 parent 9d88d1c commit b89b2f3

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

scripts/index_from_s3_bucket.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ def get_ids(dataset):
193193
yield source.id
194194
yield dataset.id
195195

196-
197196
resolver = Doc2Dataset(index)
198197
dataset, err = resolver(doc, uri)
199198
index.datasets.archive(get_ids(dataset))
@@ -206,15 +205,16 @@ def add_dataset(doc, uri, index, sources_policy):
206205
dataset, err = resolver(doc, uri)
207206
if err is not None:
208207
logging.error("%s", err)
209-
try:
210-
index.datasets.add(dataset, sources_policy=sources_policy) # Source policy to be checked in sentinel 2 datase types
211-
except changes.DocumentMismatchError as e:
212-
index.datasets.update(dataset, {tuple(): changes.allow_any})
213-
except Exception as e:
214-
logging.error("Unhandled exception %s", e)
215-
216-
return uri
208+
else:
209+
try:
210+
index.datasets.add(dataset, sources_policy=sources_policy) # Source policy to be checked in sentinel 2 datase types
211+
except changes.DocumentMismatchError as e:
212+
index.datasets.update(dataset, {tuple(): changes.allow_any})
213+
except Exception as e:
214+
err = e
215+
logging.error("Unhandled exception %s", e)
217216

217+
return err
218218

219219
def worker(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, queue):
220220
dc=datacube.Datacube(config=config)
@@ -226,7 +226,6 @@ def worker(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, qu
226226
try:
227227
key = queue.get(timeout=60)
228228
if key == GUARDIAN:
229-
queue.task_done()
230229
break
231230
logging.info("Processing %s %s", key, current_process())
232231
obj = s3.Object(bucket_name, key).get(ResponseCacheControl='no-cache')
@@ -246,6 +245,8 @@ def worker(config, bucket_name, prefix, suffix, func, unsafe, sources_policy, qu
246245
queue.task_done()
247246
except Empty:
248247
break
248+
except EOFError:
249+
break
249250

250251

251252
def iterate_datasets(bucket_name, config, prefix, suffix, func, unsafe, sources_policy):
@@ -268,8 +269,6 @@ def iterate_datasets(bucket_name, config, prefix, suffix, func, unsafe, sources_
268269
if (obj.key.endswith(suffix)):
269270
queue.put(obj.key)
270271

271-
queue.join()
272-
273272
for i in range(worker_count):
274273
queue.put(GUARDIAN)
275274

0 commit comments

Comments
 (0)