Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions openquake/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import psutil
import h5py
import numpy
import requests
try:
from setproctitle import setproctitle
except ImportError:
Expand Down Expand Up @@ -295,11 +296,35 @@ def watchdog(calc_id, pid, timeout):
break


def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
def notify_job_complete(job_id, notify_to, exc=None):
"""
Callback to notify a web endpoint that a job has completed.

:param job_id): ID of the completed job
:param notify_to: the endpoint to send the notification to
:param exc: the exception raised during the job, if any
"""
if not notify_to:
return
if notify_to.startswith('https://') or notify_to.startswith('http://'):
status = "failed" if exc else "success"
payload = {
"job_id": job_id,
"status": status,
"error": str(exc) if exc else None,
}
response = requests.post(notify_to, json=payload, timeout=5)
response.raise_for_status()
else:
logging.error(f'notify_job_complete: {notify_to=} not valid')


def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs, notify_to):
for job in jobctxs:
dic = {'status': 'executing', 'pid': _PID,
'start_time': datetime.now(UTC)}
logs.dbcmd('update_job', job.calc_id, dic)
exc = None
try:
if dist in ('zmq', 'slurm') and w.WorkerMaster(job_id).status() == []:
start_workers(job_id, dist, nodes)
Expand All @@ -317,8 +342,8 @@ def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
args = [(ctx,) for ctx in jobctxs[1:]]
else:
args = [(ctx,) for ctx in jobctxs]
#with multiprocessing.pool.Pool(concurrent_jobs) as pool:
# pool.starmap(run_calc, args)
# with multiprocessing.pool.Pool(concurrent_jobs) as pool:
# pool.starmap(run_calc, args)
parallel.multispawn(run_calc, args, concurrent_jobs or 1)
elif concurrent_jobs:
nc = 1 + parallel.num_cores // concurrent_jobs
Expand All @@ -330,13 +355,17 @@ def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
else:
for jobctx in jobctxs:
run_calc(jobctx)
except Exception as e:
exc = e
raise
finally:
notify_job_complete(job_id, notify_to, exc)
if dist == 'zmq' or (dist == 'slurm' and not sbatch):
stop_workers(job_id)


def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False,
precalc=False):
precalc=False, notify_to=None):
"""
Run jobs using the specified config file and other options.

Expand Down Expand Up @@ -385,7 +414,7 @@ def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False,
for job in jobctxs:
logs.dbcmd('finish', job.calc_id, 'aborted')
raise
_run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs)
_run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs, notify_to)
return jobctxs


Expand Down
2 changes: 2 additions & 0 deletions openquake/server/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
re_path(r'^v1/available_gsims$', views.get_available_gsims),
re_path(r'^v1/ini_defaults$', views.get_ini_defaults,
name="ini_defaults"),
# NOTE: uncomment to test callbacks
# re_path(r'^v1/log_callback$', views.log_callback),
]
if settings.APPLICATION_MODE != 'PUBLIC':
urlpatterns += [
Expand Down
34 changes: 29 additions & 5 deletions openquake/server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from openquake.engine import __version__ as oqversion
from openquake.engine.export import core
from openquake.engine import engine, aelo, impact
from openquake.engine.engine import notify_job_complete
from openquake.engine.aelo import (
get_params_from, PRELIMINARY_MODELS, PRELIMINARY_MODEL_WARNING_MSG)
from openquake.engine.export.core import DataStoreExportError
Expand Down Expand Up @@ -687,6 +688,15 @@ def calc_log_size(request, calc_id):
return JsonResponse(response_data)


@csrf_exempt
@require_http_methods(['POST'])
def log_callback(request):
print(request.POST)
print(request.GET)
print(request.body.decode('utf8'))
return HttpResponse(content='Done')


@csrf_exempt
@cross_domain_ajax
@require_http_methods(['POST'])
Expand All @@ -702,6 +712,8 @@ def calc_run(request):
The request also needs to contain the files needed to perform the
calculation. They can be uploaded as separate files, or zipped
together.
If the request has the attribute `notify_to`, and it starts with
'http[s]://', the engine will send a notification to the given url
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

action type (POST) and retrieved data maybe a nice info add

"""
job_ini = request.POST.get('job_ini')
hazard_job_id = request.POST.get('hazard_job_id')
Expand All @@ -710,8 +722,12 @@ def calc_run(request):
else:
ini = job_ini if job_ini else ".ini"
user = utils.get_user(request)
notify_to = request.POST.get('notify_to')
# notify_to = 'http://127.0.0.1:8800/v1/log_callback?pippo=pluto'

try:
job_id = submit_job(request.FILES, ini, user, hazard_job_id)
job_id = submit_job(
request.FILES, ini, user, hazard_job_id, notify_to)
except Exception as exc: # job failed, for instance missing .xml file
# get the exception message
exc_msg = traceback.format_exc() + str(exc)
Expand All @@ -734,11 +750,16 @@ def calc_run_ini(request):
:param request:
a `django.http.HttpRequest` object.
The request must contain the full path to a job.ini file
If the request has the attribute `notify_to`, and it starts with
'http[s]://', the engine will send a notification to the given url
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extend the description as above

"""
ini = request.POST.get('job_ini')
ini = request.POST['job_ini']
hazard_job_id = request.POST.get('hazard_job_id')
user = utils.get_user(request)
notify_to = request.POST.get('notify_to')
# notify_to = 'http://127.0.0.1:8800/v1/log_callback?pippo=pluto'
try:
job_id = submit_job([], ini, user, hc_id=None)
job_id = submit_job([], ini, user, hazard_job_id, notify_to=notify_to)
except Exception as exc: # job failed, for instance missing .ini file
# get the exception message
exc_msg = traceback.format_exc() + str(exc)
Expand Down Expand Up @@ -1225,7 +1246,7 @@ def aelo_run(request):
return JsonResponse(response_data, status=200)


def submit_job(request_files, ini, username, hc_id):
def submit_job(request_files, ini, username, hc_id, notify_to=None):
"""
Create a job object from the given files and run it in a new process.

Expand Down Expand Up @@ -1272,7 +1293,10 @@ def submit_job(request_files, ini, username, hc_id):
CALC_NAME='calc%d' % job.calc_id)
subprocess.run(submit_cmd, input=yaml.encode('ascii'))
else:
proc = mp.Process(target=engine.run_jobs, args=([job],))
kwargs = {}
if notify_to is not None:
kwargs['notify_to'] = notify_to
proc = mp.Process(target=engine.run_jobs, args=([job],), kwargs=kwargs)
proc.start()
if config.webapi.calc_timeout:
mp.Process(
Expand Down