Skip to content

Commit

Permalink
empty record written on container failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Oct 28, 2024
1 parent 2b4e9d3 commit f0b16db
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 27 deletions.
6 changes: 6 additions & 0 deletions datasets/bathy/docker/oceaneyes/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ def runClassifier(classifier, classifier_func, num_processes=6):
if len(beam_failures[beam]) > 0: # at least one classifier encountered an error
beam_list.remove(beam)

# write empty record of run if no beams left
if len(beam_list) == 0:
with open(settings["filename"] + ".empty", "w") as ror_file:
ror_file.write(json.dumps({"beam_failures": beam_failures}))
sys.exit(0)

# create one large dataframe of all spots
df = pd.concat([beam_table[beam] for beam in beam_list])
print("Concatenated data frames into a single data frame")
Expand Down
72 changes: 45 additions & 27 deletions datasets/bathy/endpoints/atl24g.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,31 @@ local start_time = time.gps() -- used for timeout handling
-------------------------------------------------------
-- function: cleanup
-------------------------------------------------------
local function cleanup(_crenv, _transaction_id)
runner.cleanup(_crenv) -- container runtime environment
core.orchunlock({_transaction_id}) -- unlock transaction
local function cleanup(_crenv, _transaction_id, failure, reason)
if failure then
-- generate filename
local record_of_run = string.format("%s/record_of_run.json", _crenv.host_sandbox_directory)
-- write record of run
local f, err = io.open(record_of_run, "w")
if f then
f:write(reason)
f:close()
else
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write record of run json for %s: %s", rspq, resource, err))
end
-- send record of run to user
local ror_parms = core.parms({
output = {
asset=rqst["parms"]["output"]["asset"], -- use original request asset
path=rqst["parms"]["output"]["path"]..".empty" -- modify the original requested path
}
})
arrow.send2user(record_of_run, ror_parms, rspq)
end
-- cleanup container runtime environent
runner.cleanup(_crenv)
-- unlock transaction
core.orchunlock({_transaction_id})
end

-------------------------------------------------------
Expand Down Expand Up @@ -169,7 +191,7 @@ end
local crenv = runner.setup()
if not crenv.host_sandbox_directory then
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("failed to initialize container runtime for %s", resource))
cleanup(crenv, transaction_id)
cleanup(crenv, transaction_id, true, "failed to initialized container runtime")
return
end

Expand Down Expand Up @@ -248,27 +270,11 @@ end
-------------------------------------------------------
-- early exit on failures or empty dataframe
-------------------------------------------------------
if failed_processing_run or not valid_output_present then
-- generate filename
local record_of_run = string.format("%s/record_of_run.json", crenv.host_sandbox_directory)
-- write record of run
local f, err = io.open(record_of_run, "w")
if f then
f:write(json.encode({failed_processing_run=failed_processing_run, valid_output_present=valid_output_present}))
f:close()
else
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write record of run json for %s: %s", rspq, resource, err))
end
-- send record of run to user
local ror_parms = core.parms({
output = {
asset=rqst["parms"]["output"]["asset"], -- use original request asset
path=rqst["parms"]["output"]["path"]..".empty" -- modify the original requested path
}
})
arrow.send2user(record_of_run, ror_parms, rspq)
-- clean up and exit
cleanup(crenv, transaction_id)
if failed_processing_run then
cleanup(crenv, transaction_id, true, "failed processing run")
return
elseif not valid_output_present then
cleanup(crenv, transaction_id, true, "no valid output present")
return
end

Expand All @@ -285,12 +291,12 @@ if granule then
granule:destroy()
else
userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> timed out waiting for granule to complete on %s", rspq, resource))
cleanup(crenv, transaction_id)
cleanup(crenv, transaction_id, true, "timed out waiting for granule information")
return
end
else
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to write granule json for %s: %s", rspq, resource, err))
cleanup(crenv, transaction_id)
cleanup(crenv, transaction_id, true, "failed to write granule information")
return
end
end
Expand Down Expand Up @@ -337,6 +343,17 @@ local container_parms = {
local container = runner.execute(crenv, container_parms, { ["settings.json"] = outputs }, rspq)
runner.wait(container, timeout)

-------------------------------------------------------
-- check for container failure
-------------------------------------------------------
local f, _ = io.open(crenv.host_sandbox_directory.."/"..tmp_filename..".empty", "r")
if f then
f:close()
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> container indicated empty output: %s", rspq, resource))
cleanup(crenv, transaction_id, true, "container indicated empty output")
return
end

-------------------------------------------------------
-- send final granule output to user
-------------------------------------------------------
Expand All @@ -354,6 +371,7 @@ if parms["output"]["format"] == "h5" then
})
arrow.send2user(crenv.host_sandbox_directory.."/"..tmp_filename..".iso.xml", xml_parms, rspq)
end

-------------------------------------------------------
-- exit
-------------------------------------------------------
Expand Down

0 comments on commit f0b16db

Please sign in to comment.