From f0b16db0a0d541031a525ade52309947d3b660b3 Mon Sep 17 00:00:00 2001 From: JP Swinski Date: Mon, 28 Oct 2024 18:47:48 +0000 Subject: [PATCH] empty record written on container failure --- datasets/bathy/docker/oceaneyes/runner.py | 6 ++ datasets/bathy/endpoints/atl24g.lua | 72 ++++++++++++++--------- 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/datasets/bathy/docker/oceaneyes/runner.py b/datasets/bathy/docker/oceaneyes/runner.py index 567714ea..88dd76ef 100644 --- a/datasets/bathy/docker/oceaneyes/runner.py +++ b/datasets/bathy/docker/oceaneyes/runner.py @@ -310,6 +310,12 @@ def runClassifier(classifier, classifier_func, num_processes=6): if len(beam_failures[beam]) > 0: # at least one classifier encountered an error beam_list.remove(beam) +# write empty record of run if no beams left +if len(beam_list) == 0: + with open(settings["filename"] + ".empty", "w") as ror_file: + ror_file.write(json.dumps({"beam_failures": beam_failures})) + sys.exit(0) + # create one large dataframe of all spots df = pd.concat([beam_table[beam] for beam in beam_list]) print("Concatenated data frames into a single data frame") diff --git a/datasets/bathy/endpoints/atl24g.lua b/datasets/bathy/endpoints/atl24g.lua index da3dc059..e7874e4e 100644 --- a/datasets/bathy/endpoints/atl24g.lua +++ b/datasets/bathy/endpoints/atl24g.lua @@ -18,9 +18,31 @@ local start_time = time.gps() -- used for timeout handling ------------------------------------------------------- -- function: cleanup ------------------------------------------------------- -local function cleanup(_crenv, _transaction_id) - runner.cleanup(_crenv) -- container runtime environment - core.orchunlock({_transaction_id}) -- unlock transaction +local function cleanup(_crenv, _transaction_id, failure, reason) + if failure then + -- generate filename + local record_of_run = string.format("%s/record_of_run.json", _crenv.host_sandbox_directory) + -- write record of run + local f, err = io.open(record_of_run, "w") + if f then + f:write(reason) + f:close() + else + userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write record of run json for %s: %s", rspq, resource, err)) + end + -- send record of run to user + local ror_parms = core.parms({ + output = { + asset=rqst["parms"]["output"]["asset"], -- use original request asset + path=rqst["parms"]["output"]["path"]..".empty" -- modify the original requested path + } + }) + arrow.send2user(record_of_run, ror_parms, rspq) + end + -- cleanup container runtime environent + runner.cleanup(_crenv) + -- unlock transaction + core.orchunlock({_transaction_id}) end ------------------------------------------------------- @@ -169,7 +191,7 @@ end local crenv = runner.setup() if not crenv.host_sandbox_directory then userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("failed to initialize container runtime for %s", resource)) - cleanup(crenv, transaction_id) + cleanup(crenv, transaction_id, true, "failed to initialized container runtime") return end @@ -248,27 +270,11 @@ end ------------------------------------------------------- -- early exit on failures or empty dataframe ------------------------------------------------------- -if failed_processing_run or not valid_output_present then - -- generate filename - local record_of_run = string.format("%s/record_of_run.json", crenv.host_sandbox_directory) - -- write record of run - local f, err = io.open(record_of_run, "w") - if f then - f:write(json.encode({failed_processing_run=failed_processing_run, valid_output_present=valid_output_present})) - f:close() - else - userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write record of run json for %s: %s", rspq, resource, err)) - end - -- send record of run to user - local ror_parms = core.parms({ - output = { - asset=rqst["parms"]["output"]["asset"], -- use original request asset - path=rqst["parms"]["output"]["path"]..".empty" -- modify the original requested path - } - }) - arrow.send2user(record_of_run, ror_parms, rspq) - -- clean up and exit - cleanup(crenv, transaction_id) +if failed_processing_run then + cleanup(crenv, transaction_id, true, "failed processing run") + return +elseif not valid_output_present then + cleanup(crenv, transaction_id, true, "no valid output present") return end @@ -285,12 +291,12 @@ if granule then granule:destroy() else userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> timed out waiting for granule to complete on %s", rspq, resource)) - cleanup(crenv, transaction_id) + cleanup(crenv, transaction_id, true, "timed out waiting for granule information") return end else userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write granule json for %s: %s", rspq, resource, err)) - cleanup(crenv, transaction_id) + cleanup(crenv, transaction_id, true, "failed to write granule information") return end end @@ -337,6 +343,17 @@ local container_parms = { local container = runner.execute(crenv, container_parms, { ["settings.json"] = outputs }, rspq) runner.wait(container, timeout) +------------------------------------------------------- +-- check for container failure +------------------------------------------------------- +local f, _ = io.open(crenv.host_sandbox_directory.."/"..tmp_filename..".empty", "r") +if f then + f:close() + userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> container indicated empty output: %s", rspq, resource)) + cleanup(crenv, transaction_id, true, "container indicated empty output") + return +end + ------------------------------------------------------- -- send final granule output to user ------------------------------------------------------- @@ -354,6 +371,7 @@ if parms["output"]["format"] == "h5" then }) arrow.send2user(crenv.host_sandbox_directory.."/"..tmp_filename..".iso.xml", xml_parms, rspq) end + ------------------------------------------------------- -- exit -------------------------------------------------------