Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion openquake/commonlib/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def dbcmd(action, *args):
raise TypeError(f'{arg} is not a simple type')
dbhost = os.environ.get('OQ_DATABASE', config.dbserver.host)
if (action.startswith('workers_') and config.zworkers.host_cores
== '127.0.0.1 -1'): # local zmq
== '127.0.0.1 -1'): # local zmq
return on_workers(action)
elif dbhost == '127.0.0.1' and getpass.getuser() != 'openquake':
# no server mode, access the database directly
Expand All @@ -93,6 +93,22 @@ def dbcmd(action, *args):
return res


def get_job_info(job_id):
job = dbcmd('get_job', int(job_id))
job_info = {key: val for (key, val) in zip(job.__dict__['_fields'],
job.__dict__['_values'])}
# NOTE: returning more than before, but keeping job_id instead of id for backwards
# compatibility
job_info['job_id'] = job_info['id']
del job_info['id']
# make datetime fields serializable
if job_info['start_time']:
job_info["start_time"] = job_info["start_time"].isoformat()
if job_info['stop_time']:
job_info["stop_time"] = job_info["stop_time"].isoformat()
return job_info


def dblog(level: str, job_id: int, task_no: int, msg: str):
"""
Log on the database
Expand Down
35 changes: 30 additions & 5 deletions openquake/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import psutil
import h5py
import numpy
import requests
try:
from setproctitle import setproctitle
except ImportError:
Expand Down Expand Up @@ -295,11 +296,31 @@ def watchdog(calc_id, pid, timeout):
break


def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
def notify_job_complete(job_id, notify_to, exc=None):
"""
Callback to notify a web endpoint that a job has completed.

:param job_id): ID of the completed job
:param notify_to: the endpoint to send the notification to
:param exc: the exception raised during the job, if any
"""
if not notify_to:
return
if notify_to.startswith('https://') or notify_to.startswith('http://'):
job_info = logs.get_job_info(job_id)
job_info['error'] = str(exc) if exc else None
response = requests.post(notify_to, json=job_info, timeout=5)
response.raise_for_status()
else:
logging.error(f'notify_job_complete: {notify_to=} not valid')


def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs, notify_to):
for job in jobctxs:
dic = {'status': 'executing', 'pid': _PID,
'start_time': datetime.now(UTC)}
logs.dbcmd('update_job', job.calc_id, dic)
exc = None
try:
if dist in ('zmq', 'slurm') and w.WorkerMaster(job_id).status() == []:
start_workers(job_id, dist, nodes)
Expand All @@ -317,8 +338,8 @@ def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
args = [(ctx,) for ctx in jobctxs[1:]]
else:
args = [(ctx,) for ctx in jobctxs]
#with multiprocessing.pool.Pool(concurrent_jobs) as pool:
# pool.starmap(run_calc, args)
# with multiprocessing.pool.Pool(concurrent_jobs) as pool:
# pool.starmap(run_calc, args)
parallel.multispawn(run_calc, args, concurrent_jobs or 1)
elif concurrent_jobs:
nc = 1 + parallel.num_cores // concurrent_jobs
Expand All @@ -330,13 +351,17 @@ def _run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs):
else:
for jobctx in jobctxs:
run_calc(jobctx)
except Exception as e:
exc = e
raise
finally:
notify_job_complete(job_id, notify_to, exc)
if dist == 'zmq' or (dist == 'slurm' and not sbatch):
stop_workers(job_id)


def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False,
precalc=False):
precalc=False, notify_to=None):
"""
Run jobs using the specified config file and other options.

Expand Down Expand Up @@ -385,7 +410,7 @@ def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False,
for job in jobctxs:
logs.dbcmd('finish', job.calc_id, 'aborted')
raise
_run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs)
_run(jobctxs, dist, job_id, nodes, sbatch, precalc, concurrent_jobs, notify_to)
return jobctxs


Expand Down
44 changes: 42 additions & 2 deletions openquake/server/tests/test_public_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
import string
import random
import logging

import django
from django.test import Client
from django.test import LiveServerTestCase, Client
from unittest import skipIf
from threading import Event
from openquake.baselib import config
from openquake.commonlib.logs import dbcmd
from openquake.engine.export import core
from openquake.server.db import actions
from openquake.server.dbserver import db
from openquake.server.views import job_complete_callback_state
from openquake.server.tests.views_test import EngineServerTestCase, loadnpz
from openquake.qa_tests_data.classical import case_01

Expand Down Expand Up @@ -377,3 +378,42 @@ def test_check_fs_access_fail(self):
self.assertEqual(resp.status_code, 200)
resp_text_dict = json.loads(resp.content.decode('utf8'))
self.assertFalse(resp_text_dict['success'])


class CallbackTest(LiveServerTestCase):
"""
Integration test checking the callback on job completion
"""

def setUp(self):
self.client = Client()
self.on_job_complete_event = Event()
self.on_job_complete_data = {}
job_complete_callback_state['event'] = self.on_job_complete_event
job_complete_callback_state['data'] = self.on_job_complete_data

def test_callback_on_job_successfully_completed(self):
notify_to = f"{self.live_server_url}/v1/check_callback?first=one&second=two"
job_ini = os.path.join(os.path.dirname(case_01.__file__), 'job.ini')
post_args = dict(
job_ini=job_ini,
notify_to=notify_to,
job_owner='custom_owner',
)
resp = self.client.post('/v1/calc/run_ini', post_args)
self.assertEqual(resp.status_code, 200)
job_info = json.loads(resp.content.decode('utf8'))
self.assertEqual(job_info['user_name'], 'custom_owner')
job_id = job_info['job_id']
self.on_job_complete_event.wait(timeout=10)
self.assertTrue(self.on_job_complete_event.is_set())
body = self.on_job_complete_data['body']
get_params = self.on_job_complete_data['GET']
self.assertEqual(body['job_id'], job_id)
self.assertEqual(body['status'], 'complete')
self.assertEqual(body['user_name'], 'custom_owner')
self.assertEqual(get_params['first'], 'one')
self.assertEqual(get_params['second'], 'two')

# TODO: we could add a test to test the callback in case of a job that starts
# successfully (the inputs are valid) but fails afterwards.
1 change: 1 addition & 0 deletions openquake/server/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
re_path(r'^v1/available_gsims$', views.get_available_gsims),
re_path(r'^v1/ini_defaults$', views.get_ini_defaults,
name="ini_defaults"),
re_path(r'^v1/check_callback$', views.check_callback),
]
if settings.APPLICATION_MODE != 'PUBLIC':
urlpatterns += [
Expand Down
13 changes: 7 additions & 6 deletions openquake/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from django.contrib.auth import get_user_model
from openquake.engine import __version__ as oqversion
from openquake.calculators.base import get_aelo_version
from openquake.commonlib import logs


def is_superuser(request):
Expand All @@ -39,21 +40,21 @@ def is_superuser(request):
return request.user.is_superuser if hasattr(request, 'user') else False


def get_user(request):
def get_username(request):
"""
Returns the users from `request` if authentication is enabled, otherwise
returns the default user (from settings, or as reported by the OS).
"""
if settings.LOCKDOWN and hasattr(request, 'user'):
if request.user.is_authenticated:
user = request.user.username
username = request.user.username
else:
# This may happen with crafted requests
user = ''
username = ''
else:
user = getattr(settings, 'DEFAULT_USER', getpass.getuser())
username = getattr(settings, 'DEFAULT_USER', getpass.getuser())

return user
return username


def get_valid_users(request):
Expand All @@ -63,7 +64,7 @@ def get_valid_users(request):
"""
if settings.LOCKDOWN:
User = get_user_model()
users = [get_user(request)]
users = [get_username(request)]
if settings.LOCKDOWN and hasattr(request, 'user'):
if request.user.is_authenticated:
groups = request.user.groups.all()
Expand Down
78 changes: 64 additions & 14 deletions openquake/server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import zlib
import re
import psutil
from threading import Event
from collections import defaultdict
from datetime import datetime, timezone
from urllib.parse import unquote_plus, urljoin, urlencode, urlparse, urlunparse
Expand Down Expand Up @@ -589,7 +590,7 @@ def calc_remove(request, calc_id):
Remove the calculation id
"""
# Only the owner can remove a job
user = utils.get_user(request)
user = utils.get_username(request)
try:
message = logs.dbcmd('del_calc', calc_id, user)
except dbapi.NotFound:
Expand Down Expand Up @@ -693,12 +694,30 @@ def calc_log_size(request, calc_id):
return JsonResponse(response_data)


job_complete_callback_state = {'event': Event(), 'data': {}}


@csrf_exempt
@cross_domain_ajax
@require_http_methods(['POST'])
def check_callback(request):
body = json.loads(request.body.decode('utf8'))
payload = dict(body=body, POST=request.POST, GET=request.GET)
job_complete_callback_state['data'].clear()
job_complete_callback_state['data'].update(payload)
job_complete_callback_state['event'].set()
return JsonResponse(payload)


@csrf_exempt
@cross_domain_ajax
@require_http_methods(['POST'])
def calc_run(request):
"""
Run a calculation.
This endpoint accepts a POST request containing the configuration and input files
required to perform a calculation. It can also optionally reuse results from a
previous hazard calculation, assign a custom job owner, and specify a callback URL
for job completion notifications.

:param request:
a `django.http.HttpRequest` object.
Expand All @@ -708,24 +727,36 @@ def calc_run(request):
The request also needs to contain the files needed to perform the
calculation. They can be uploaded as separate files, or zipped
together.
If the request has the attribute `notify_to`, and it starts with
'http[s]://', the engine will send a notification to the given url.
If the request has the attribute `job_owner`, the owner of the job will be set
to that string instead of the name of the user performing the request.

:returns:
A `django.http.JsonResponse` object containing:
- On success (HTTP 200): information about the initialized job
- On failure (HTTP 500): traceback details and job ID (if available).
"""
job_ini = request.POST.get('job_ini')
hazard_job_id = request.POST.get('hazard_job_id')
if hazard_job_id: # "continue" button, tested in the QGIS plugin
ini = job_ini if job_ini else "risk.ini"
else:
ini = job_ini if job_ini else ".ini"
user = utils.get_user(request)
notify_to = request.POST.get('notify_to')
username = request.POST.get('job_owner')
if not username:
username = utils.get_username(request)
try:
job_id = submit_job(request.FILES, ini, user, hazard_job_id)
job_id = submit_job(request.FILES, ini, username, hazard_job_id, notify_to)
except Exception as exc: # job failed, for instance missing .xml file
# get the exception message
exc_msg = traceback.format_exc() + str(exc)
logging.error(exc_msg)
response_data = dict(traceback=exc_msg.splitlines(), job_id=exc.job_id)
status = 500
else:
response_data = dict(status='created', job_id=job_id)
response_data = logs.get_job_info(job_id)
status = 200
return JsonResponse(response_data, status=status)

Expand All @@ -735,24 +766,40 @@ def calc_run(request):
@require_http_methods(['POST'])
def calc_run_ini(request):
"""
Run a calculation.
This endpoint accepts a POST request containing the configuration .ini file that
defines the parameters needed to perform a calculation. It can optionally reuse
results from a previous hazard job, assign a custom job owner, and specify
a callback URL for job completion notifications.

:param request:
a `django.http.HttpRequest` object.
The request must contain the full path to a job.ini file
If the request has the attribute `notify_to`, and it starts with
'http[s]://', the engine will send a notification to the given url.
If the request has the attribute `job_owner`, the owner of the job will be set
to that string instead of the name of the user performing the request.

:returns:
A `django.http.JsonResponse` object containing:
- On success (HTTP 200): information about the initialized job
- On failure (HTTP 500): traceback details and job ID (if available).
"""
ini = request.POST.get('job_ini')
user = utils.get_user(request)
ini = request.POST['job_ini']
hazard_job_id = request.POST.get('hazard_job_id')
notify_to = request.POST.get('notify_to')
username = request.POST.get('job_owner')
if not username:
username = utils.get_username(request)
try:
job_id = submit_job([], ini, user, hc_id=None)
job_id = submit_job([], ini, username, hazard_job_id, notify_to=notify_to)
except Exception as exc: # job failed, for instance missing .ini file
# get the exception message
exc_msg = traceback.format_exc() + str(exc)
logging.error(exc_msg)
response_data = dict(traceback=exc_msg.splitlines(), job_id=exc.job_id)
status = 500
else:
response_data = dict(status='created', job_id=job_id)
response_data = logs.get_job_info(job_id)
status = 200
return JsonResponse(response_data, status=status)

Expand Down Expand Up @@ -978,7 +1025,7 @@ def get_uploaded_file_path(request, filename):
def create_impact_job(request, params):
[jobctx] = engine.create_jobs(
[params], config.distribution.log_level,
user_name=utils.get_user(request))
user_name=utils.get_username(request))

job_owner_email = request.user.email
response_data = dict()
Expand Down Expand Up @@ -1194,7 +1241,7 @@ def aelo_run(request):
return JsonResponse(response_data, status=400)
[jobctx] = engine.create_jobs(
[params],
config.distribution.log_level, None, utils.get_user(request), None)
config.distribution.log_level, None, utils.get_username(request), None)
job_id = jobctx.calc_id

outputs_uri_web = request.build_absolute_uri(
Expand Down Expand Up @@ -1231,7 +1278,7 @@ def aelo_run(request):
return JsonResponse(response_data, status=200)


def submit_job(request_files, ini, username, hc_id):
def submit_job(request_files, ini, username, hc_id, notify_to=None):
"""
Create a job object from the given files and run it in a new process.

Expand Down Expand Up @@ -1278,7 +1325,10 @@ def submit_job(request_files, ini, username, hc_id):
CALC_NAME='calc%d' % job.calc_id)
subprocess.run(submit_cmd, input=yaml.encode('ascii'))
else:
proc = mp.Process(target=engine.run_jobs, args=([job],))
kwargs = {}
if notify_to is not None:
kwargs['notify_to'] = notify_to
proc = mp.Process(target=engine.run_jobs, args=([job],), kwargs=kwargs)
proc.start()
if config.webapi.calc_timeout:
mp.Process(
Expand Down