From ac16be76ccd4bc4cb79728a5940e3567d003a0d2 Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Wed, 3 Feb 2021 01:54:44 -0800 Subject: [PATCH 1/3] Log more context on error and use `.json` instead of home-made method --- services/metadata_service/api/artifact.py | 3 +-- services/metadata_service/api/flow.py | 3 +-- services/metadata_service/api/metadata.py | 3 +-- services/metadata_service/api/run.py | 3 +-- services/metadata_service/api/step.py | 3 +-- services/metadata_service/api/task.py | 3 +-- services/metadata_service/api/utils.py | 16 +++++++++++++--- services/utils/__init__.py | 9 --------- 8 files changed, 19 insertions(+), 24 deletions(-) diff --git a/services/metadata_service/api/artifact.py b/services/metadata_service/api/artifact.py index d29b2ecf..46dbdd8c 100644 --- a/services/metadata_service/api/artifact.py +++ b/services/metadata_service/api/artifact.py @@ -1,7 +1,6 @@ from aiohttp import web from services.data.postgres_async_db import AsyncPostgresDB from services.data.db_utils import filter_artifacts_by_attempt_id_for_tasks -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import json @@ -306,7 +305,7 @@ async def create_artifacts(self, request): run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") - body = await read_body(request.content) + body = await request.json() count = 0 try: diff --git a/services/metadata_service/api/flow.py b/services/metadata_service/api/flow.py index 633a1f65..4b3353c5 100644 --- a/services/metadata_service/api/flow.py +++ b/services/metadata_service/api/flow.py @@ -1,6 +1,5 @@ from services.data import FlowRow from services.data.postgres_async_db import AsyncPostgresDB -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import asyncio @@ -54,7 +53,7 @@ async def create_flow(self, request): """ flow_name = request.match_info.get("flow_id") - body = await read_body(request.content) + body = await request.json() user = body.get("user_name") tags = body.get("tags") system_tags = body.get("system_tags") diff --git a/services/metadata_service/api/metadata.py b/services/metadata_service/api/metadata.py index fb7fa6be..e94e2a4a 100644 --- a/services/metadata_service/api/metadata.py +++ b/services/metadata_service/api/metadata.py @@ -1,6 +1,5 @@ from aiohttp import web import json -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import asyncio @@ -171,7 +170,7 @@ async def create_metadata(self, request): step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") - body = await read_body(request.content) + body = await request.json() count = 0 try: run_number, run_id = await self._db.get_run_ids(flow_name, run_number) diff --git a/services/metadata_service/api/run.py b/services/metadata_service/api/run.py index 250d84e2..500488ed 100644 --- a/services/metadata_service/api/run.py +++ b/services/metadata_service/api/run.py @@ -1,6 +1,5 @@ import asyncio from services.data.models import RunRow -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions from services.data.postgres_async_db import AsyncPostgresDB @@ -119,7 +118,7 @@ async def create_run(self, request): """ flow_name = request.match_info.get("flow_id") - body = await read_body(request.content) + body = await request.json() user = body.get("user_name") tags = body.get("tags") system_tags = body.get("system_tags") diff --git a/services/metadata_service/api/step.py b/services/metadata_service/api/step.py index d4157d54..af76807b 100644 --- a/services/metadata_service/api/step.py +++ b/services/metadata_service/api/step.py @@ -1,5 +1,4 @@ from services.data import StepRow -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions from services.data.postgres_async_db import AsyncPostgresDB @@ -144,7 +143,7 @@ async def create_step(self, request): run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") - body = await read_body(request.content) + body = await request.json() user = body.get("user_name", "") tags = body.get("tags") system_tags = body.get("system_tags") diff --git a/services/metadata_service/api/task.py b/services/metadata_service/api/task.py index 6c7adc23..4de01b75 100644 --- a/services/metadata_service/api/task.py +++ b/services/metadata_service/api/task.py @@ -1,6 +1,5 @@ from services.data import TaskRow from services.data.postgres_async_db import AsyncPostgresDB -from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import json @@ -170,7 +169,7 @@ async def create_task(self, request): flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") - body = await read_body(request.content) + body = await request.json() user = body.get("user_name") tags = body.get("tags") diff --git a/services/metadata_service/api/utils.py b/services/metadata_service/api/utils.py index 527e9160..e29995a0 100644 --- a/services/metadata_service/api/utils.py +++ b/services/metadata_service/api/utils.py @@ -53,10 +53,20 @@ def handle_exceptions(func): """Catch exceptions and return appropriate HTTP error.""" @wraps(func) - async def wrapper(*args, **kwargs): + async def wrapper(self, request): try: - return await func(*args, **kwargs) + return await func(self, request) except Exception as err: - return http_500(str(err)) + # either use provided traceback from subprocess, or generate trace from current process + err_trace = getattr(err, 'traceback_str', None) or get_traceback_str() + print(err_trace) + # We log the request that caused this for debugging information + try: + body = await request.text() + except: + body = '' + print("Error caused when %s %s with query %s and body %s" % + (request.method, request.url, request.query_string, body)) + return http_500(str(err), err_trace) return wrapper diff --git a/services/utils/__init__.py b/services/utils/__init__.py index 1483e2a1..351cdbb7 100644 --- a/services/utils/__init__.py +++ b/services/utils/__init__.py @@ -18,15 +18,6 @@ logging.basicConfig(level=log_level) -async def read_body(request_content): - byte_array = bytearray() - while not request_content.at_eof(): - data = await request_content.read(4) - byte_array.extend(data) - - return json.loads(byte_array.decode("utf-8")) - - def get_traceback_str(): """Get the traceback as a string.""" From 90965b51047b74ee9706ea5da2036753bec71f81 Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Wed, 3 Feb 2021 15:49:42 -0800 Subject: [PATCH 2/3] Add pool recycle to pool --- services/data/postgres_async_db.py | 9 ++++++--- .../migration_service/data/postgres_async_db.py | 2 +- services/utils/__init__.py | 10 ++++++---- services/utils/tests/unit_tests/utils_test.py | 17 +++++++++++------ 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index a3faf2ad..6a029479 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -74,9 +74,10 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT try: self.pool = await aiopg.create_pool( db_conf.dsn, + timeout=db_conf.pool_timeout, minsize=db_conf.pool_min, maxsize=db_conf.pool_max, - timeout=db_conf.timeout, + pool_recycle=db_conf.pool_recycle, echo=AIOPG_ECHO) # Clean existing trigger functions before creating new ones @@ -89,9 +90,11 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT self.logger.info( "Connection established.\n" - " Pool min: {pool_min} max: {pool_max}\n".format( + " Pool min: {pool_min} max: {pool_max} timeout: {pool_timeout} recycle: {pool_recycle}\n".format( pool_min=self.pool.minsize, - pool_max=self.pool.maxsize)) + pool_max=self.pool.maxsize, + pool_timeout=self.pool.timeout, + pool_recycle=db_conf.pool_recycle)) break # Break the retry loop except Exception as e: diff --git a/services/migration_service/data/postgres_async_db.py b/services/migration_service/data/postgres_async_db.py index 7f4da46f..3f3d1860 100644 --- a/services/migration_service/data/postgres_async_db.py +++ b/services/migration_service/data/postgres_async_db.py @@ -39,7 +39,7 @@ async def _init(self, db_conf: DBConfiguration): retries = 3 for i in range(retries): try: - self.pool = await aiopg.create_pool(db_conf.dsn, timeout=db_conf.timeout) + self.pool = await aiopg.create_pool(db_conf.dsn, timeout=db_conf.pool_timeout) except Exception as e: print("printing connection exception: " + str(e)) if retries - i < 1: diff --git a/services/utils/__init__.py b/services/utils/__init__.py index 351cdbb7..df59435d 100644 --- a/services/utils/__init__.py +++ b/services/utils/__init__.py @@ -76,8 +76,8 @@ class DBConfiguration(object): # https://aiopg.readthedocs.io/en/stable/_modules/aiopg/pool.html#create_pool pool_min: int = None # aiopg default: 1 pool_max: int = None # aiopg default: 10 - - timeout: int = None # aiopg default: 60 (seconds) + pool_timeout: int = None # aiopg default: 60 (seconds) + pool_recycle : int = -1 _dsn: str = None @@ -91,7 +91,8 @@ def __init__(self, prefix="MF_METADATA_DB_", pool_min: int = 1, pool_max: int = 10, - timeout: int = 60): + timeout: int = 60, + pool_recycle: int = -1): table = str.maketrans({"'": "\'", "`": r"\`"}) self._dsn = os.environ.get(prefix + "DSN", dsn) @@ -107,7 +108,8 @@ def __init__(self, self.pool_min = int(os.environ.get(prefix + "POOL_MIN", pool_min)) self.pool_max = int(os.environ.get(prefix + "POOL_MAX", pool_max)) - self.timeout = int(os.environ.get(prefix + "TIMEOUT", timeout)) + self.pool_timeout = int(os.environ.get(prefix + "TIMEOUT", timeout)) + self.pool_recycle = int(os.environ.get(prefix + "POOL_RECYCLE", pool_recycle)) @property def dsn(self): diff --git a/services/utils/tests/unit_tests/utils_test.py b/services/utils/tests/unit_tests/utils_test.py index cb673998..ed59376b 100644 --- a/services/utils/tests/unit_tests/utils_test.py +++ b/services/utils/tests/unit_tests/utils_test.py @@ -51,7 +51,8 @@ def test_db_conf(): assert db_conf.dsn == 'dbname=postgres user=postgres password=postgres host=localhost port=5432' assert db_conf.pool_min == 1 assert db_conf.pool_max == 10 - assert db_conf.timeout == 60 + assert db_conf.pool_timeout == 60 + assert db_conf.pool_recycle == -1 def test_db_conf_dsn(): @@ -79,7 +80,8 @@ def test_db_conf_env_default_prefix(): 'MF_METADATA_DB_NAME': 'bar', 'MF_METADATA_DB_POOL_MIN': '2', 'MF_METADATA_DB_POOL_MAX': '4', - 'MF_METADATA_DB_TIMEOUT': '5' + 'MF_METADATA_DB_TIMEOUT': '5', + 'MF_METADATA_DB_POOL_RECYCLE': '1' }): db_conf = DBConfiguration() assert db_conf.dsn == 'dbname=bar user=user password=password host=foo port=1234' @@ -90,7 +92,8 @@ def test_db_conf_env_default_prefix(): assert db_conf.database_name == 'bar' assert db_conf.pool_min == 2 assert db_conf.pool_max == 4 - assert db_conf.timeout == 5 + assert db_conf.pool_timeout == 5 + assert db_conf.pool_recycle == 1 def test_db_conf_env_custom_prefix(): @@ -102,7 +105,8 @@ def test_db_conf_env_custom_prefix(): 'FOO_NAME': 'bar', 'FOO_POOL_MIN': '2', 'FOO_POOL_MAX': '4', - 'FOO_TIMEOUT': '5' + 'FOO_TIMEOUT': '5', + 'FOO_POOL_RECYCLE': '1' }): db_conf = DBConfiguration(prefix='FOO_') assert db_conf.dsn == 'dbname=bar user=user password=password host=foo port=1234' @@ -113,7 +117,8 @@ def test_db_conf_env_custom_prefix(): assert db_conf.database_name == 'bar' assert db_conf.pool_min == 2 assert db_conf.pool_max == 4 - assert db_conf.timeout == 5 + assert db_conf.pool_timeout == 5 + assert db_conf.pool_recycle == 1 def test_db_conf_env_dsn(): @@ -131,5 +136,5 @@ def test_db_conf_pool_size(): def test_db_conf_timeout(): with set_env(): db_conf = DBConfiguration(timeout=5) - assert db_conf.timeout == 5 + assert db_conf.pool_timeout == 5 From e12afb287b06dcfcd8b7e7f696c017e0f291bf3d Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Fri, 28 May 2021 10:53:56 -0700 Subject: [PATCH 3/3] Various cleanups and better error handling --- services/data/db_utils.py | 10 +-- services/data/postgres_async_db.py | 19 +++--- services/metadata_service/api/admin.py | 12 ++-- services/metadata_service/api/artifact.py | 77 +++++++++++++---------- services/metadata_service/api/flow.py | 14 ++--- services/metadata_service/api/metadata.py | 35 ++++++----- services/metadata_service/api/run.py | 20 +++--- services/metadata_service/api/step.py | 23 ++++--- services/metadata_service/api/task.py | 29 +++++---- services/metadata_service/api/utils.py | 1 + 10 files changed, 130 insertions(+), 110 deletions(-) diff --git a/services/data/db_utils.py b/services/data/db_utils.py index 48373485..7e72f986 100644 --- a/services/data/db_utils.py +++ b/services/data/db_utils.py @@ -14,17 +14,17 @@ def aiopg_exception_handling(exception): body = {"err_msg": err_msg} if isinstance(exception, psycopg2.IntegrityError): if "duplicate key" in err_msg: - return DBResponse(response_code=409, body=json.dumps(body)) + return DBResponse(response_code=409, body=body) elif "foreign key" in err_msg: - return DBResponse(response_code=404, body=json.dumps(body)) + return DBResponse(response_code=404, body=body) else: - return DBResponse(response_code=500, body=json.dumps(body)) + return DBResponse(response_code=500, body=body) elif isinstance(exception, psycopg2.errors.UniqueViolation): - return DBResponse(response_code=409, body=json.dumps(body)) + return DBResponse(response_code=409, body=body) elif isinstance(exception, IndexError): return DBResponse(response_code=404, body={}) else: - return DBResponse(response_code=500, body=json.dumps(body)) + return DBResponse(response_code=500, body=body) def get_db_ts_epoch_str(): diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index 6a029479..d2c08df1 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -7,7 +7,7 @@ import time import datetime from services.utils import logging -from typing import List +from typing import List, Tuple from .db_utils import DBResponse, DBPagination, aiopg_exception_handling, \ get_db_ts_epoch_str, translate_run_key, translate_task_key @@ -110,17 +110,15 @@ def get_table_by_name(self, table_name: str): return None async def get_run_ids(self, flow_id: str, run_id: str): - run = await self.run_table_postgres.get_run(flow_id, run_id, + return await self.run_table_postgres.get_run(flow_id, run_id, expanded=True) - return run.body['run_number'], run.body['run_id'] async def get_task_ids(self, flow_id: str, run_id: str, step_name: str, task_name: str): - task = await self.task_table_postgres.get_task(flow_id, run_id, + return await self.task_table_postgres.get_task(flow_id, run_id, step_name, task_name, expanded=True) - return task.body['task_id'], task.body['task_name'] class AsyncPostgresDB(object): @@ -186,7 +184,7 @@ async def get_records(self, filter_dict={}, fetch_single=False, async def find_records(self, conditions: List[str] = None, values=[], fetch_single=False, limit: int = 0, offset: int = 0, order: List[str] = None, expanded=False, - enable_joins=False) -> (DBResponse, DBPagination): + enable_joins=False) -> Tuple[DBResponse, DBPagination]: sql_template = """ SELECT * FROM ( SELECT @@ -215,7 +213,7 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing expanded=expanded, limit=limit, offset=offset) async def execute_sql(self, select_sql: str, values=[], fetch_single=False, - expanded=False, limit: int = 0, offset: int = 0) -> (DBResponse, DBPagination): + expanded=False, limit: int = 0, offset: int = 0) -> Tuple[DBResponse, DBPagination]: try: with ( await self.db.pool.cursor( @@ -547,9 +545,7 @@ async def update_heartbeat(self, flow_id: str, run_id: str): update_dict=set_dict) body = {"wait_time_in_seconds": WAIT_TIME} - return DBResponse(response_code=result.response_code, - body=json.dumps(body)) - + return DBResponse(response_code=result.response_code, body=body) class AsyncStepTablePostgres(AsyncPostgresTable): step_dict = {} @@ -692,8 +688,7 @@ async def update_heartbeat(self, flow_id: str, run_id: str, step_name: str, body = {"wait_time_in_seconds": WAIT_TIME} - return DBResponse(response_code=result.response_code, - body=json.dumps(body)) + return DBResponse(response_code=result.response_code, body=body) class AsyncMetadataTablePostgres(AsyncPostgresTable): diff --git a/services/metadata_service/api/admin.py b/services/metadata_service/api/admin.py index 5423529b..e86a62a7 100644 --- a/services/metadata_service/api/admin.py +++ b/services/metadata_service/api/admin.py @@ -30,7 +30,7 @@ async def version(self, request): tags: - Admin produces: - - 'text/plain' + - text/plain responses: "200": description: successful operation. Return the version number @@ -46,7 +46,7 @@ async def ping(self, request): tags: - Admin produces: - - 'text/plain' + - text/plain responses: "202": description: successful operation. Return "pong" text @@ -64,7 +64,7 @@ async def healthcheck(self, request): tags: - Admin produces: - - 'application/json' + - application/json responses: "202": description: successful operation. @@ -96,11 +96,11 @@ async def healthcheck(self, request): async def get_authorization_token(self, request): """ --- - description: this is used exclusively for sandbox auth + description: This endpoint is used exclusively for sandbox auth tags: - Auth produces: - - text/plain + - application/json responses: "200": description: successfully returned certs @@ -139,5 +139,5 @@ async def get_authorization_token(self, request): return web.Response(status=200, body=json.dumps(credentials)) except Exception as ex: - body = {"err_msg": str(ex), "traceback": get_traceback_str()} + body = {"message": str(ex), "traceback": get_traceback_str()} return web.Response(status=500, body=json.dumps(body)) diff --git a/services/metadata_service/api/artifact.py b/services/metadata_service/api/artifact.py index 46dbdd8c..7edc13d6 100644 --- a/services/metadata_service/api/artifact.py +++ b/services/metadata_service/api/artifact.py @@ -1,6 +1,6 @@ from aiohttp import web from services.data.postgres_async_db import AsyncPostgresDB -from services.data.db_utils import filter_artifacts_by_attempt_id_for_tasks +from services.data.db_utils import DBResponse, filter_artifacts_by_attempt_id_for_tasks from services.metadata_service.api.utils import format_response, \ handle_exceptions import json @@ -42,12 +42,12 @@ def __init__(self, app): self._async_table = AsyncPostgresDB.get_instance().artifact_table_postgres self._db = AsyncPostgresDB.get_instance() - @format_response @handle_exceptions + @format_response async def get_artifact(self, request): """ --- - description: get all artifacts associated with the specified task. + description: get a specific artifact tags: - Artifacts parameters: @@ -77,10 +77,12 @@ async def get_artifact(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation + "404": + description: no such artifact "405": description: invalid HTTP Method """ @@ -94,6 +96,8 @@ async def get_artifact(self, request): flow_name, run_number, step_name, task_id, artifact_name ) + @handle_exceptions + @format_response async def get_artifacts_by_task(self, request): """ --- @@ -122,7 +126,7 @@ async def get_artifacts_by_task(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation @@ -137,17 +141,17 @@ async def get_artifacts_by_task(self, request): artifacts = await self._async_table.get_artifact_in_task( flow_name, run_number, step_name, task_id ) + if artifacts.response_code == 200: + artifacts.body = filter_artifacts_by_attempt_id_for_tasks( + artifacts.body) + return artifacts - filtered_body = filter_artifacts_by_attempt_id_for_tasks( - artifacts.body) - return web.Response( - status=artifacts.response_code, body=json.dumps(filtered_body) - ) - + @handle_exceptions + @format_response async def get_artifacts_by_step(self, request): """ --- - description: get all artifacts associated with the specified task. + description: get all artifacts associated with a given step tags: - Artifacts parameters: @@ -167,7 +171,7 @@ async def get_artifacts_by_step(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation @@ -182,16 +186,17 @@ async def get_artifacts_by_step(self, request): flow_name, run_number, step_name ) - filtered_body = filter_artifacts_by_attempt_id_for_tasks( - artifacts.body) - return web.Response( - status=artifacts.response_code, body=json.dumps(filtered_body) - ) + if artifacts.response_code == 200: + artifacts.body = filter_artifacts_by_attempt_id_for_tasks( + artifacts.body) + return artifacts + @handle_exceptions + @format_response async def get_artifacts_by_run(self, request): """ --- - description: get all artifacts associated with the specified task. + description: get all artifacts associated with the specified run. tags: - Artifacts parameters: @@ -206,7 +211,7 @@ async def get_artifacts_by_run(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation @@ -217,16 +222,18 @@ async def get_artifacts_by_run(self, request): run_number = request.match_info.get("run_number") artifacts = await self._async_table.get_artifacts_in_runs(flow_name, run_number) - filtered_body = filter_artifacts_by_attempt_id_for_tasks( - artifacts.body) - return web.Response( - status=artifacts.response_code, body=json.dumps(filtered_body) - ) + if artifacts.response_code == 200: + artifacts.body = filter_artifacts_by_attempt_id_for_tasks( + artifacts.body) + return artifacts + + @handle_exceptions + @format_response async def create_artifacts(self, request): """ --- - description: This end-point allow to test that service is up. + description: Registers artifacts with the service tags: - Artifacts parameters: @@ -276,7 +283,7 @@ async def create_artifacts(self, request): system_tags: type: object produces: - - 'text/plain' + - application/json responses: "202": description: successful operation. @@ -308,13 +315,15 @@ async def create_artifacts(self, request): body = await request.json() count = 0 - try: - run_number, run_id = await self._db.get_run_ids(flow_name, run_number) - task_id, task_name = await self._db.get_task_ids(flow_name, run_number, - step_name, task_id) - except Exception: - return web.Response(status=400, body=json.dumps( - {"message": "need to register run_id and task_id first"})) + run = await self._db.get_run_ids(flow_name, run_number) + task = await self._db.get_task_ids(flow_name, run_number, + step_name, task_id) + if run.response_code != 200 or task.response_code != 200: + return DBResponse(400, {"message": "need to register run_id and task_id first"}) + run_id = run.body['run_id'] + run_number = run.body['run_number'] + task_id = task.body['task_id'] + task_name = task.body['task_name'] # todo change to bulk insert for artifact in body: diff --git a/services/metadata_service/api/flow.py b/services/metadata_service/api/flow.py index 4b3353c5..68517fc4 100644 --- a/services/metadata_service/api/flow.py +++ b/services/metadata_service/api/flow.py @@ -15,12 +15,12 @@ def __init__(self, app): app.router.add_route("POST", "/flows/{flow_id}", self.create_flow) self._async_table = AsyncPostgresDB.get_instance().flow_table_postgres - @format_response @handle_exceptions + @format_response async def create_flow(self, request): """ --- - description: create/register a flow + description: Create/register a flow tags: - Flow parameters: @@ -44,7 +44,7 @@ async def create_flow(self, request): type: object produces: - - 'text/plain' + - application/json responses: "200": description: successfully created flow row @@ -62,8 +62,8 @@ async def create_flow(self, request): ) return await self._async_table.add_flow(flow) - @format_response @handle_exceptions + @format_response async def get_flow(self, request): """ --- @@ -77,7 +77,7 @@ async def get_flow(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation. Return flow @@ -90,8 +90,8 @@ async def get_flow(self, request): flow_name = request.match_info.get("flow_id") return await self._async_table.get_flow(flow_name) - @format_response @handle_exceptions + @format_response async def get_all_flows(self, request): """ --- @@ -99,7 +99,7 @@ async def get_all_flows(self, request): tags: - Flow produces: - - text/plain + - application/json responses: "200": description: successful operation. Returned all registered flows diff --git a/services/metadata_service/api/metadata.py b/services/metadata_service/api/metadata.py index e94e2a4a..e61b6a8a 100644 --- a/services/metadata_service/api/metadata.py +++ b/services/metadata_service/api/metadata.py @@ -4,6 +4,7 @@ handle_exceptions import asyncio from services.data.postgres_async_db import AsyncPostgresDB +from services.data.db_utils import DBResponse class MetadataApi(object): @@ -31,12 +32,12 @@ def __init__(self, app): self._db = AsyncPostgresDB.get_instance() self._async_table = AsyncPostgresDB.get_instance().metadata_table_postgres - @format_response @handle_exceptions + @format_response async def get_metadata(self, request): """ --- - description: get all metadata associated with the specified task. + description: Get all metadata associated with the specified task. tags: - Metadata parameters: @@ -61,7 +62,7 @@ async def get_metadata(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation @@ -76,12 +77,12 @@ async def get_metadata(self, request): flow_name, run_number, step_name, task_id ) - @format_response @handle_exceptions + @format_response async def get_metadata_by_run(self, request): """ --- - description: get all metadata associated with the specified run. + description: Get all metadata associated with the specified run. tags: - Metadata parameters: @@ -96,7 +97,7 @@ async def get_metadata_by_run(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation @@ -109,10 +110,12 @@ async def get_metadata_by_run(self, request): flow_name, run_number ) + @handle_exceptions + @format_response async def create_metadata(self, request): """ --- - description: persist metadata + description: Saves metadata to the service tags: - Metadata parameters: @@ -158,7 +161,7 @@ async def create_metadata(self, request): system_tags: type: object produces: - - 'text/plain' + - application/json responses: "202": description: successful operation. @@ -172,13 +175,15 @@ async def create_metadata(self, request): body = await request.json() count = 0 - try: - run_number, run_id = await self._db.get_run_ids(flow_name, run_number) - task_id, task_name = await self._db.get_task_ids(flow_name, run_number, - step_name, task_id) - except Exception: - return web.Response(status=400, body=json.dumps( - {"message": "need to register run_id and task_id first"})) + run = await self._db.get_run_ids(flow_name, run_number) + task = await self._db.get_task_ids(flow_name, run_number, + step_name, task_id) + if run.response_code != 200 or task.response_code != 200: + return DBResponse(400, {"message": "need to register run_id and task_id first"}) + run_id = run.body['run_id'] + run_number = run.body['run_number'] + task_id = task.body['task_id'] + task_name = task.body['task_name'] for datum in body: values = { diff --git a/services/metadata_service/api/run.py b/services/metadata_service/api/run.py index 500488ed..31d76633 100644 --- a/services/metadata_service/api/run.py +++ b/services/metadata_service/api/run.py @@ -19,8 +19,8 @@ def __init__(self, app): self.runs_heartbeat) self._async_table = AsyncPostgresDB.get_instance().run_table_postgres - @format_response @handle_exceptions + @format_response async def get_run(self, request): """ --- @@ -39,7 +39,7 @@ async def get_run(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation. Return specified run @@ -52,8 +52,8 @@ async def get_run(self, request): run_number = request.match_info.get("run_number") return await self._async_table.get_run(flow_name, run_number) - @format_response @handle_exceptions + @format_response async def get_all_runs(self, request): """ --- @@ -67,7 +67,7 @@ async def get_all_runs(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: Returned all runs of specified flow @@ -77,12 +77,12 @@ async def get_all_runs(self, request): flow_name = request.match_info.get("flow_id") return await self._async_table.get_all_runs(flow_name) - @format_response @handle_exceptions + @format_response async def create_run(self, request): """ --- - description: create run and generate run id + description: Create run and generate run id tags: - Run parameters: @@ -107,7 +107,7 @@ async def create_run(self, request): system_tags: type: object produces: - - 'text/plain' + - application/json responses: "200": description: successful operation. Return newly registered run @@ -134,12 +134,12 @@ async def create_run(self, request): return await self._async_table.add_run(run_row) - @format_response @handle_exceptions + @format_response async def runs_heartbeat(self, request): """ --- - description: update hb + description: Heartbeat the run tags: - Run parameters: @@ -160,7 +160,7 @@ async def runs_heartbeat(self, request): schema: type: object produces: - - 'text/plain' + - application/json responses: "200": description: successful operation. Return newly registered run diff --git a/services/metadata_service/api/step.py b/services/metadata_service/api/step.py index af76807b..e2c34171 100644 --- a/services/metadata_service/api/step.py +++ b/services/metadata_service/api/step.py @@ -1,6 +1,7 @@ from services.data import StepRow from services.metadata_service.api.utils import format_response, \ handle_exceptions +from services.data.db_utils import DBResponse from services.data.postgres_async_db import AsyncPostgresDB @@ -24,12 +25,12 @@ def __init__(self, app): self._run_table = AsyncPostgresDB.get_instance().run_table_postgres self._db = AsyncPostgresDB.get_instance() - @format_response @handle_exceptions + @format_response async def get_steps(self, request): """ --- - description: get all steps associated with the specified run. + description: Get all steps associated with the specified run. tags: - Steps parameters: @@ -44,7 +45,7 @@ async def get_steps(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation. returned all steps @@ -55,12 +56,12 @@ async def get_steps(self, request): run_number = request.match_info.get("run_number") return await self._async_table.get_steps(flow_name, run_number) - @format_response @handle_exceptions + @format_response async def get_step(self, request): """ --- - description: get specified step. + description: Get specified step. tags: - Steps parameters: @@ -80,7 +81,7 @@ async def get_step(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation. Returned specified step @@ -94,8 +95,8 @@ async def get_step(self, request): step_name = request.match_info.get("step_name") return await self._async_table.get_step(flow_name, run_number, step_name) - @format_response @handle_exceptions + @format_response async def create_step(self, request): """ --- @@ -132,7 +133,7 @@ async def create_step(self, request): system_tags: type: object produces: - - text/plain + - application/json responses: "200": description: successful operation. Registered step @@ -148,7 +149,11 @@ async def create_step(self, request): tags = body.get("tags") system_tags = body.get("system_tags") - run_number, run_id = await self._db.get_run_ids(flow_name, run_number) + run = await self._db.get_run_ids(flow_name, run_number) + if run.response_code != 200: + return DBResponse(400, {"message": "need to register run_id first"}) + run_id = run.body['run_id'] + run_number = run.body['run_number'] step_row = StepRow( flow_name, run_number, run_id, user, step_name, tags=tags, diff --git a/services/metadata_service/api/task.py b/services/metadata_service/api/task.py index 4de01b75..9a8833df 100644 --- a/services/metadata_service/api/task.py +++ b/services/metadata_service/api/task.py @@ -1,4 +1,5 @@ from services.data import TaskRow +from services.data.db_utils import DBResponse from services.data.postgres_async_db import AsyncPostgresDB from services.metadata_service.api.utils import format_response, \ handle_exceptions @@ -34,12 +35,12 @@ def __init__(self, app): self._async_table = AsyncPostgresDB.get_instance().task_table_postgres self._db = AsyncPostgresDB.get_instance() - @format_response @handle_exceptions + @format_response async def get_tasks(self, request): """ --- - description: get all tasks associated with the specified step. + description: Get all tasks associated with the specified step. tags: - Tasks parameters: @@ -59,7 +60,7 @@ async def get_tasks(self, request): required: true type: "string" produces: - - text/plain + - application/json responses: "200": description: successful operation. Return tasks @@ -72,12 +73,12 @@ async def get_tasks(self, request): return await self._async_table.get_tasks(flow_name, run_number, step_name) - @format_response @handle_exceptions + @format_response async def get_task(self, request): """ --- - description: get all artifacts associated with the specified task. + description: Get specified task tags: - Tasks parameters: @@ -117,12 +118,12 @@ async def get_task(self, request): flow_name, run_number, step_name, task_id ) - @format_response @handle_exceptions + @format_response async def create_task(self, request): """ --- - description: This end-point allow to test that service is up. + description: Creates a task tags: - Tasks parameters: @@ -157,7 +158,7 @@ async def create_task(self, request): task_id: type: string produces: - - 'text/plain' + - application/json responses: "202": description: successful operation. Return newly registered task @@ -180,7 +181,11 @@ async def create_task(self, request): return web.Response(status=400, body=json.dumps( {"message": "provided task_name may not be a numeric"})) - run_number, run_id = await self._db.get_run_ids(flow_id, run_number) + run = await self._db.get_run_ids(flow_id, run_number) + if run.response_code != 200: + return DBResponse(400, {"message": "need to register run_id and task_id first"}) + run_id = run.body['run_id'] + run_number = run.body['run_number'] task = TaskRow( flow_id=flow_id, @@ -194,12 +199,12 @@ async def create_task(self, request): ) return await self._async_table.add_task(task) - @format_response @handle_exceptions + @format_response async def tasks_heartbeat(self, request): """ --- - description: update hb + description: Heartbeats the task tags: - Tasks parameters: @@ -230,7 +235,7 @@ async def tasks_heartbeat(self, request): schema: type: object produces: - - 'text/plain' + - application/json responses: "200": description: successful operation. Return newly registered run diff --git a/services/metadata_service/api/utils.py b/services/metadata_service/api/utils.py index e29995a0..64d40056 100644 --- a/services/metadata_service/api/utils.py +++ b/services/metadata_service/api/utils.py @@ -22,6 +22,7 @@ async def wrapper(*args, **kwargs): db_response = await func(*args, **kwargs) return web.Response(status=db_response.response_code, body=json.dumps(db_response.body), + content_type='application/json', headers=MultiDict( {METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION}))