diff --git a/polytope_server/common/metric_store/dynamodb_metric_store.py b/polytope_server/common/metric_store/dynamodb_metric_store.py new file mode 100644 index 0000000..c9ae66d --- /dev/null +++ b/polytope_server/common/metric_store/dynamodb_metric_store.py @@ -0,0 +1,208 @@ +# +# Copyright 2024 European Centre for Medium-Range Weather Forecasts (ECMWF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. +# + +import logging +import operator +import warnings +from decimal import Decimal +from enum import Enum +from functools import reduce + +import boto3 +import botocore +import botocore.exceptions +from boto3.dynamodb.conditions import Attr, Key + +from ..metric import ( + CacheInfo, + Metric, + MetricType, + QueueInfo, + RequestStatusChange, + StorageInfo, + WorkerInfo, + WorkerStatusChange, +) +from . import MetricStore + +logger = logging.getLogger(__name__) + + +METRIC_TYPE_CLASS_MAP = { + MetricType.WORKER_STATUS_CHANGE: WorkerStatusChange, + MetricType.WORKER_INFO: WorkerInfo, + MetricType.REQUEST_STATUS_CHANGE: RequestStatusChange, + MetricType.STORAGE_INFO: StorageInfo, + MetricType.CACHE_INFO: CacheInfo, + MetricType.QUEUE_INFO: QueueInfo, +} + + +def _iter_items(fn, **params): + while True: + response = fn(**params) + for item in response["Items"]: + yield item + if "LastEvaluatedKey" not in response: + break + params["ExclusiveStartKey"] = response["LastEvaluatedKey"] + + +def _make_query(**kwargs): + return { + key: value.value if isinstance(value, Enum) else value for key, value in kwargs.items() if value is not None + } + + +def _visit(obj, fn): + if isinstance(obj, dict): + return {key: _visit(value, fn) for key, value in obj.items()} + if isinstance(obj, list): + return [_visit(value, fn) for value in obj] + return fn(obj) + + +def _convert_numbers(obj, reverse=False): + def fn(item): + if not reverse and isinstance(item, float): + return Decimal(item) + elif reverse and isinstance(item, Decimal): + return float(item) + return item + + return _visit(obj, fn) + + +def _load(item): + metric_type = Metric.deserialize_slot("type", item["type"]) + cls = METRIC_TYPE_CLASS_MAP[metric_type] + return cls(from_dict=_convert_numbers(item, reverse=True)) + + +def _dump(metric): + item = _convert_numbers(metric.serialize()) + if "request_id" in item and item["request_id"] is None: + del item["request_id"] # index hash keys are not nullable + return item + + +def _create_table(dynamodb, table_name): + try: + kwargs = { + "AttributeDefinitions": [ + {"AttributeName": "uuid", "AttributeType": "S"}, + {"AttributeName": "request_id", "AttributeType": "S"}, + ], + "TableName": table_name, + "KeySchema": [{"AttributeName": "uuid", "KeyType": "HASH"}], + "GlobalSecondaryIndexes": [ + { + "IndexName": "request-index", + "KeySchema": [{"AttributeName": "request_id", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "ALL"}, + }, + ], + "BillingMode": "PAY_PER_REQUEST", + } + table = dynamodb.create_table(**kwargs) + table.wait_until_exists() + except dynamodb.meta.client.exceptions.ResourceInUseException: + pass + + +class DynamoDBMetricStore(MetricStore): + def __init__(self, config=None): + if config is None: + config = {} + + endpoint_url = config.get("endpoint_url") + region = config.get("region") + table_name = config.get("table_name", "metrics") + + dynamodb = boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url) + client = dynamodb.meta.client + self.table = dynamodb.Table(table_name) + + try: + response = client.describe_table(TableName=table_name) + if response["Table"]["TableStatus"] != "ACTIVE": + raise RuntimeError(f"DynamoDB table {table_name} is not active.") + except client.exceptions.ResourceNotFoundException: + _create_table(dynamodb, table_name) + + def get_type(self): + return "dynamodb" + + def add_metric(self, metric): + try: + self.table.put_item(Item=_dump(metric), ConditionExpression=Attr("uuid").not_exists()) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + raise ValueError("Request already exists in request store") from e + raise + + def remove_metric(self, uuid): + try: + self.table.delete_item(Key={"uuid": str(uuid)}, ConditionExpression=Attr("uuid").exists()) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + raise KeyError("Request does not exist in request store") from e + raise + + def get_metric(self, uuid): + response = self.table.get_item(Key={"uuid": str(uuid)}) + if "Item" in response: + return _load(response["Item"]) + + def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, **kwargs): + if ascending is not None and descending is not None: + raise ValueError("Cannot sort by ascending and descending at the same time.") + + if request_id is not None: + fn = self.table.query + params = { + "IndexName": "request-index", + "KeyConditionExpression": Key("request_id").eq(request_id), + } + else: + fn = self.table.scan + params = {} + + if limit is not None: + params["Limit"] = limit + + if query := _make_query(**kwargs): + params["FilterExpression"] = reduce(operator.__and__, (Attr(key).eq(value) for key, value in query.items())) + + items = (_load(item) for item in _iter_items(fn, **params)) + if ascending is not None: + return sorted(items, key=lambda item: getattr(item, ascending)) + if descending is not None: + return sorted(items, key=lambda item: getattr(item, descending), reverse=True) + return list(items) + + def update_metric(self, metric): + self.table.put_item(Item=_dump(metric)) + + def wipe(self): + warnings.warn("wipe is not implemented for DynamoDBMetricStore") + + def collect_metric_info(self): + return {} diff --git a/polytope_server/common/metric_store/metric_store.py b/polytope_server/common/metric_store/metric_store.py index 6c1bfa1..f1d14bf 100644 --- a/polytope_server/common/metric_store/metric_store.py +++ b/polytope_server/common/metric_store/metric_store.py @@ -68,7 +68,7 @@ def collect_metric_info( """Collect dictionary of metrics""" -type_to_class_map = {"mongodb": "MongoMetricStore"} +type_to_class_map = {"mongodb": "MongoMetricStore", "dynamodb": "DynamoDBMetricStore"} def create_metric_store(metric_store_config=None): diff --git a/polytope_server/common/metric_store/mongodb_metric_store.py b/polytope_server/common/metric_store/mongodb_metric_store.py index b529426..9cbc8ee 100644 --- a/polytope_server/common/metric_store/mongodb_metric_store.py +++ b/polytope_server/common/metric_store/mongodb_metric_store.py @@ -39,6 +39,9 @@ class MongoMetricStore(MetricStore): def __init__(self, config=None): + if config is None: + config = {} + uri = config.get("uri", "mongodb://localhost:27017") metric_collection = config.get("collection", "metrics") diff --git a/polytope_server/common/request_store/dynamodb_request_store.py b/polytope_server/common/request_store/dynamodb_request_store.py new file mode 100644 index 0000000..fb4a57c --- /dev/null +++ b/polytope_server/common/request_store/dynamodb_request_store.py @@ -0,0 +1,242 @@ +# +# Copyright 2022 European Centre for Medium-Range Weather Forecasts (ECMWF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. +# + +import datetime as dt +import logging +import operator +import warnings +from decimal import Decimal +from functools import reduce + +import boto3 +import botocore.exceptions +from boto3.dynamodb.conditions import Attr, Key + +from .. import metric_store +from ..metric import RequestStatusChange +from ..request import Request +from . import request_store + +logger = logging.getLogger(__name__) + + +def _iter_items(fn, **params): + while True: + response = fn(**params) + for item in response["Items"]: + yield item + if "LastEvaluatedKey" not in response: + break + params["ExclusiveStartKey"] = response["LastEvaluatedKey"] + + +def _make_query(**kwargs): + query = {} + for key, value in kwargs.items(): + if key not in Request.__slots__: + raise KeyError("Request has no key {}".format(key)) + + if value is None: + continue + + query[key] = Request.serialize_slot(key, value) + + return query + + +def _visit(obj, fn): + if isinstance(obj, dict): + return {key: _visit(value, fn) for key, value in obj.items()} + if isinstance(obj, list): + return [_visit(value, fn) for value in obj] + return fn(obj) + + +def _convert_numbers(obj, reverse=False): + def fn(item): + if not reverse and isinstance(item, float): + return Decimal(item) + elif reverse and isinstance(item, Decimal): + return float(item) + return item + + return _visit(obj, fn) + + +def _load(item): + return Request( + from_dict={key: _convert_numbers(value, reverse=True) for key, value in item.items() if key != "user_id"} + ) + + +def _dump(request): + item = _convert_numbers(request.serialize()) + if request.user is not None: + return item | {"user_id": str(request.user.id)} + return item + + +def _create_table(dynamodb, table_name): + try: + kwargs = { + "AttributeDefinitions": [ + {"AttributeName": "id", "AttributeType": "S"}, + {"AttributeName": "status", "AttributeType": "S"}, + {"AttributeName": "user_id", "AttributeType": "S"}, + ], + "TableName": table_name, + "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}], + "GlobalSecondaryIndexes": [ + { + "IndexName": "status-index", + "KeySchema": [{"AttributeName": "status", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "ALL"}, + }, + { + "IndexName": "user-index", + "KeySchema": [{"AttributeName": "user_id", "KeyType": "HASH"}], + "Projection": {"ProjectionType": "ALL"}, + }, + ], + "BillingMode": "PAY_PER_REQUEST", + } + table = dynamodb.create_table(**kwargs) + table.wait_until_exists() + except dynamodb.meta.client.exceptions.ResourceInUseException: + pass + + +class DynamoDBRequestStore(request_store.RequestStore): + + def __init__(self, config=None, metric_store_config=None): + if config is None: + config = {} + + endpoint_url = config.get("endpoint_url") + region = config.get("region") + table_name = config.get("table_name", "requests") + + dynamodb = boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url) + client = dynamodb.meta.client + self.table = dynamodb.Table(table_name) + + try: + response = client.describe_table(TableName=table_name) + if response["Table"]["TableStatus"] != "ACTIVE": + raise RuntimeError(f"DynamoDB table {table_name} is not active.") + except client.exceptions.ResourceNotFoundException: + _create_table(dynamodb, table_name) + + self.metric_store = None + if metric_store_config is not None: + self.metric_store = metric_store.create_metric_store(metric_store_config) + + logger.info("DynamoDB request store configured for table name %s.", table_name) + + def get_type(self): + return "dynamodb" + + def add_request(self, request): + try: + self.table.put_item(Item=_dump(request), ConditionExpression=Attr("id").not_exists()) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + raise ValueError("Request already exists in request store") from e + raise + + if self.metric_store: + self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status)) + + logger.info("Request ID %s status set to %s.", request.id, request.status) + + def remove_request(self, id): + try: + self.table.delete_item(Key={"id": id}, ConditionExpression=Attr("id").exists()) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + raise KeyError("Request does not exist in request store") from e + raise + + if self.metric_store: + items = self.metric_store.get_metrics(request_id=id) + for item in items: + self.metric_store.remove_metric(item.uuid) + + logger.info("Request ID %s removed.", id) + + def get_request(self, id): + response = self.table.get_item(Key={"id": id}) + if "Item" in response: + return _load(response["Item"]) + + def get_requests(self, ascending=None, descending=None, limit=None, status=None, user=None, **kwargs): + if ascending is not None and descending is not None: + raise ValueError("Cannot sort by ascending and descending at the same time.") + + query = _make_query(**kwargs) + if user is not None: + key_cond_expr = Key("user_id").eq(str(user.id)) + fn = self.table.query + params = { + "IndexName": "user-index", + "KeyConditionExpression": key_cond_expr, + } + if status is not None: + query["status"] = status.value + elif status is not None: + key_cond_expr = Key("status").eq(status.value) + fn = self.table.query + params = { + "IndexName": "status-index", + "KeyConditionExpression": key_cond_expr, + } + else: + fn = self.table.scan + params = {} + + if query: + filter_expr = reduce(operator.__and__, (Attr(key).eq(value) for key, value in query.items())) + params["FilterExpression"] = filter_expr + + if limit is not None: + params["Limit"] = limit + + reqs = (_load(item) for item in _iter_items(fn, **params)) + if ascending: + return sorted(reqs, key=lambda req: getattr(req, ascending)) + if descending: + return sorted(reqs, key=lambda req: getattr(req, descending), reverse=True) + return list(reqs) + + def update_request(self, request): + now = dt.datetime.now(dt.timezone.utc) + request.last_modified = now.timestamp() + self.table.put_item(Item=_dump(request)) + + if self.metric_store: + self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status)) + + logger.info("Request ID %s status set to %s.", request.id, request.status) + + def wipe(self): + warnings.warn("wipe is not implemented for DynamoDBRequestStore") + + def collect_metric_info(self): + return {} diff --git a/polytope_server/common/request_store/request_store.py b/polytope_server/common/request_store/request_store.py index 6df5f26..d7b68bc 100644 --- a/polytope_server/common/request_store/request_store.py +++ b/polytope_server/common/request_store/request_store.py @@ -68,7 +68,7 @@ def collect_metric_info( """Collect dictionary of metrics""" -type_to_class_map = {"mongodb": "MongoRequestStore"} +type_to_class_map = {"mongodb": "MongoRequestStore", "dynamodb": "DynamoDBRequestStore"} def create_request_store(request_store_config=None, metric_store_config=None): diff --git a/polytope_server/version.py b/polytope_server/version.py index 039aa6d..b509716 100644 --- a/polytope_server/version.py +++ b/polytope_server/version.py @@ -20,4 +20,4 @@ # Single-source Polytope version number -__version__ = "0.8.1" +__version__ = "0.8.2" diff --git a/requirements.txt b/requirements.txt index 9c578e8..4a839ff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ jsonschema==4.23.0 ldap3==2.9.1 Markdown==3.7 minio==7.2.8 +moto[dynamodb]==5.0.16 pika==1.3.2 polytope-mars==0.0.10 polytope-python==1.0.7 diff --git a/tests/unit/test_dynamodb.py b/tests/unit/test_dynamodb.py new file mode 100644 index 0000000..d2e1722 --- /dev/null +++ b/tests/unit/test_dynamodb.py @@ -0,0 +1,121 @@ +import os +from unittest import mock + +import pytest +from moto import mock_aws + +from polytope_server.common import request, user +from polytope_server.common.request_store import dynamodb_request_store + + +@pytest.fixture(scope="function") +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + values = { + "AWS_ACCESS_KEY_ID": "testing", + "AWS_SECRET_ACCESS_KEY": "testing", + "AWS_SECURITY_TOKEN": "testing", + "AWS_SESSION_TOKEN": "testing", + "AWS_DEFAULT_REGION": "us-east-1", + } + with mock.patch.dict(os.environ, values): + yield + + +@pytest.fixture(scope="function") +def mocked_aws(aws_credentials): + with mock_aws(): + yield + + +def test_default_init(mocked_aws): + store = dynamodb_request_store.DynamoDBRequestStore() + assert store.get_type() == "dynamodb" + + +def test_add_request(mocked_aws): + store = dynamodb_request_store.DynamoDBRequestStore() + u1 = user.User("some-user", "some-realm") + r1 = request.Request(user=u1, verb=request.Verb.RETRIEVE, status=request.Status.QUEUED) + assert r1.user == u1 + store.add_request(r1) + r2 = store.get_request(r1.id) + assert r2 is not None + assert r2.id == r1.id + assert r2.user == r1.user + assert r2.verb == r1.verb + assert r2.status == r1.status + + +def test_add_request_duplicate(mocked_aws): + store = dynamodb_request_store.DynamoDBRequestStore() + req = request.Request() + store.add_request(req) + with pytest.raises(ValueError): + store.add_request(req) + + +def test_remove_request(mocked_aws): + store = dynamodb_request_store.DynamoDBRequestStore() + req = request.Request() + store.add_request(req) + assert store.get_request(req.id) is not None + store.remove_request(req.id) + assert store.get_request(req.id) is None + + +@pytest.fixture(scope="function") +def populated(mocked_aws): + def func(): + u1, u2, u3 = (user.User(f"user{i}", f"realm{i}") for i in (1, 2, 3)) + r1 = request.Request(user=u1, collection="hello", status=request.Status.PROCESSED) + r2 = request.Request(user=u2, collection="hello", content_length=10) + r3 = request.Request(user=u3, collection="hello2") + store = dynamodb_request_store.DynamoDBRequestStore() + for req in (r1, r2, r3): + store.add_request(req) + return store, [r1, r2, r3], [u1, u2, u3] + + return func + + +def test_get_requests_user(populated): + store, (r1, *_), (u1, *_) = populated() + res = store.get_requests(user=u1) + assert res == [r1] + + +def test_get_requests_id(populated): + store, (*_, r3), _ = populated() + res = store.get_requests(id=r3.id) + assert res == [r3] + + +def test_get_requests_scan(populated): + store, (_, r2, _), _ = populated() + res = store.get_requests(content_length=10) + assert res == [r2] + + +def test_update(mocked_aws): + u1 = user.User("user1", "realm1") + r1 = request.Request(user=u1) + store = dynamodb_request_store.DynamoDBRequestStore() + store.add_request(r1) + r2 = store.get_request(r1.id) + assert r1 == r2 + + r2.user.attributes["test"] = "updated" + store.update_request(r2) + + r3 = store.get_request(r1.id) + assert r3.id == r1.id + assert r3.user.attributes["test"] == "updated" + + +def test_metric_store(mocked_aws): + store = dynamodb_request_store.DynamoDBRequestStore(metric_store_config={"dynamodb": {"table_name": "metrics"}}) + r1 = request.Request() + store.add_request(r1) + [m1] = store.metric_store.get_metrics() + assert m1.request_id == r1.id