diff --git a/Dockerfile b/Dockerfile index 05886fb..7ad64f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -108,7 +108,7 @@ ARG ecbuild_version=3.8.2 ARG eccodes_version=2.33.1 ARG eckit_version=1.28.0 ARG fdb_version=5.13.2 -ARG pyfdb_version=0.0.3 +ARG pyfdb_version=0.1.0 RUN apt update # COPY polytope-deployment/common/default_fdb_schema /polytope/config/fdb/default @@ -173,6 +173,7 @@ RUN set -eux && \ # Install pyfdb \ RUN set -eux \ && git clone --single-branch --branch ${pyfdb_version} https://github.com/ecmwf/pyfdb.git \ + && python -m pip install "numpy<2.0" --user\ && python -m pip install ./pyfdb --user ####################################################### @@ -200,7 +201,7 @@ RUN set -eux \ ls -R /opt RUN set -eux \ - && git clone --single-branch --branch develop https://github.com/ecmwf/gribjump.git + && git clone --single-branch --branch ${gribjump_version} https://github.com/ecmwf/gribjump.git # Install pygribjump RUN set -eux \ && cd /gribjump \ @@ -229,9 +230,13 @@ FROM mars-base AS mars-base-c RUN apt update && apt install -y liblapack3 mars-client=${mars_client_c_version} mars-client-cloud FROM mars-base AS mars-base-cpp +ARG pyfdb_version=0.1.0 RUN apt update && apt install -y mars-client-cpp=${mars_client_cpp_version} RUN set -eux \ - && python3 -m pip install git+https://github.com/ecmwf/pyfdb.git@master --user + && git clone --single-branch --branch ${pyfdb_version} https://github.com/ecmwf/pyfdb.git \ + && python -m pip install "numpy<2.0" --user\ + && python -m pip install ./pyfdb --user + FROM blank-base AS blank-base-c FROM blank-base AS blank-base-cpp @@ -342,7 +347,6 @@ COPY --chown=polytope --from=gribjump-base-final /root/.local /home/polytope/.lo # Copy python requirements COPY --chown=polytope --from=worker-base /root/.venv /home/polytope/.local - # Install the server source COPY --chown=polytope . /polytope/ diff --git a/polytope_server/common/auth.py b/polytope_server/common/auth.py index 8bea551..2d528ed 100644 --- a/polytope_server/common/auth.py +++ b/polytope_server/common/auth.py @@ -132,16 +132,16 @@ def authenticate(self, auth_header) -> User: def is_authorized(self, user, roles): """Checks if the user has any of the provided roles""" - # roles can be a single value; convert to a list - if not isinstance(roles, (tuple, list, set)): - roles = [roles] - # roles can be a dict of realm:[roles] mapping; find the relevant realm. if isinstance(roles, dict): if user.realm not in roles: raise ForbiddenRequest("Not authorized to access this resource.") roles = roles[user.realm] + # roles can be a single value; convert to a list + if not isinstance(roles, (tuple, list, set)): + roles = [roles] + for required_role in roles: if required_role in user.roles: return True diff --git a/polytope_server/common/authorization/ldap_authorization.py b/polytope_server/common/authorization/ldap_authorization.py index 6722f3b..e17a400 100644 --- a/polytope_server/common/authorization/ldap_authorization.py +++ b/polytope_server/common/authorization/ldap_authorization.py @@ -23,6 +23,7 @@ from ldap3 import SUBTREE, Connection, Server from ..auth import User +from ..caching import cache from . import authorization @@ -40,6 +41,7 @@ def __init__(self, name, realm, config): self.username_attribute = config.get("username-attribute", None) super().__init__(name, realm, config) + @cache(lifetime=120) def get_roles(self, user: User) -> list: if user.realm != self.realm(): raise ValueError( diff --git a/polytope_server/common/datasource/coercion.py b/polytope_server/common/datasource/coercion.py new file mode 100644 index 0000000..6c8d42f --- /dev/null +++ b/polytope_server/common/datasource/coercion.py @@ -0,0 +1,251 @@ +import copy +from datetime import datetime, timedelta +from typing import Any, Dict + + +class CoercionError(Exception): + pass + + +class Coercion: + + allow_ranges = [ + "number", + "step", + "date", + ] + allow_lists = ["class", "stream", "type", "expver", "param", "number", "date", "step"] + + @staticmethod + def coerce(request: Dict[str, Any]) -> Dict[str, Any]: + request = copy.deepcopy(request) + for key, value in request.items(): + request[key] = Coercion.coerce_value(key, value) + return request + + @staticmethod + def coerce_value(key: str, value: Any): + if key in Coercion.coercer: + coercer_func = Coercion.coercer[key] + + if isinstance(value, list): + # Coerce each item in the list + coerced_values = [Coercion.coerce_value(key, v) for v in value] + return "/".join(coerced_values) + elif isinstance(value, str): + + if "/to/" in value and key in Coercion.allow_ranges: + # Handle ranges with possible "/by/" suffix + start_value, rest = value.split("/to/", 1) + if not rest: + raise CoercionError(f"Invalid range format for key {key}.") + + if "/by/" in rest: + end_value, suffix = rest.split("/by/", 1) + suffix = "/by/" + suffix # Add back the '/by/' + else: + end_value = rest + suffix = "" + + # Coerce start_value and end_value + start_coerced = coercer_func(start_value) + end_coerced = coercer_func(end_value) + + return f"{start_coerced}/to/{end_coerced}{suffix}" + elif "/" in value and key in Coercion.allow_lists: + # Handle lists + coerced_values = [coercer_func(v) for v in value.split("/")] + return "/".join(coerced_values) + else: + # Single value + return coercer_func(value) + else: # not list or string + return coercer_func(value) + else: + if isinstance(value, list): + # Join list into '/' separated string + coerced_values = [str(v) for v in value] + return "/".join(coerced_values) + else: + return value + + @staticmethod + def coerce_date(value: Any) -> str: + try: + # Attempt to convert the value to an integer + int_value = int(value) + if int_value > 0: + # Positive integers are assumed to be dates in YYYYMMDD format + date_str = str(int_value) + try: + datetime.strptime(date_str, "%Y%m%d") + return date_str + except ValueError: + raise CoercionError("Invalid date format, expected YYYYMMDD or YYYY-MM-DD.") + else: + # Zero or negative integers represent relative days from today + target_date = datetime.today() + timedelta(days=int_value) + return target_date.strftime("%Y%m%d") + except (ValueError, TypeError): + # The value is not an integer or cannot be converted to an integer + pass + + if isinstance(value, str): + value_stripped = value.strip() + # Try parsing as YYYYMMDD + try: + datetime.strptime(value_stripped, "%Y%m%d") + return value_stripped + except ValueError: + # Try parsing as YYYY-MM-DD + try: + date_obj = datetime.strptime(value_stripped, "%Y-%m-%d") + return date_obj.strftime("%Y%m%d") + except ValueError: + raise CoercionError("Invalid date format, expected YYYYMMDD or YYYY-MM-DD.") + else: + raise CoercionError("Invalid date format, expected YYYYMMDD or YYYY-MM-DD.") + + @staticmethod + def coerce_step(value: Any) -> str: + + if isinstance(value, int): + if value < 0: + raise CoercionError("Step must be greater than or equal to 0.") + else: + return str(value) + elif isinstance(value, str): + if not value.isdigit() or int(value) < 0: + raise CoercionError("Step must be greater than or equal to 0.") + return value + else: + raise CoercionError("Invalid type, expected integer or string.") + + @staticmethod + def coerce_number(value: Any) -> str: + + if isinstance(value, int): + if value <= 0: + raise CoercionError("Number must be a positive value.") + else: + return str(value) + elif isinstance(value, str): + if not value.isdigit() or int(value) <= 0: + raise CoercionError("Number must be a positive integer.") + return value + else: + raise CoercionError("Invalid type, expected integer or string.") + + @staticmethod + def coerce_param(value: Any) -> str: + if isinstance(value, int): + return str(value) + elif isinstance(value, str): + return value + else: + raise CoercionError("Invalid param type, expected integer or string.") + + @staticmethod + def coerce_time(value: Any) -> str: + if isinstance(value, int): + if value < 0: + raise CoercionError("Invalid time format, expected HHMM or HH greater than zero.") + elif value < 24: + # Treat as hour with minute=0 + hour = value + minute = 0 + elif 100 <= value <= 2359: + # Possible HHMM format + hour = value // 100 + minute = value % 100 + else: + raise CoercionError("Invalid time format, expected HHMM or HH.") + elif isinstance(value, str): + value_stripped = value.strip() + # Check for colon-separated time (e.g., "12:00") + if ":" in value_stripped: + parts = value_stripped.split(":") + if len(parts) != 2: + raise CoercionError("Invalid time format, expected HHMM or HH.") + hour_str, minute_str = parts + if not (hour_str.isdigit() and minute_str.isdigit()): + raise CoercionError("Invalid time format, expected HHMM or HH.") + hour = int(hour_str) + minute = int(minute_str) + else: + if value_stripped.isdigit(): + num_digits = len(value_stripped) + if num_digits == 4: + # Format is "HHMM" + hour = int(value_stripped[:2]) + minute = int(value_stripped[2:]) + elif num_digits <= 2: + # Format is "H" or "HH" + hour = int(value_stripped) + minute = 0 + else: + raise CoercionError("Invalid time format, expected HHMM or HH.") + else: + raise CoercionError("Invalid time format, expected HHMM or HH.") + else: + raise CoercionError("Invalid type for time, expected string or integer.") + + # Validate hour and minute + if not (0 <= hour <= 23): + raise CoercionError("Invalid time format, expected HHMM or HH.") + if not (0 <= minute <= 59): + raise CoercionError("Invalid time format, expected HHMM or HH.") + if minute != 0: + raise CoercionError("Invalid time format, expected HHMM or HH.") + + # Format time as HHMM + time_str = f"{hour:02d}{minute:02d}" + return time_str + + # Validate hour and minute + if not (0 <= hour <= 23): + raise CoercionError("Hour must be between 0 and 23.") + if not (0 <= minute <= 59): + raise CoercionError("Minute must be between 0 and 59.") + if minute != 0: + # In your test cases, minute must be zero + raise CoercionError("Minute must be zero.") + + # Format time as HHMM + time_str = f"{hour:02d}{minute:02d}" + return time_str + + @staticmethod + def coerce_expver(value: Any) -> str: + + # Integers accepted, converted to 4-length strings + if isinstance(value, int): + if 0 <= value <= 9999: + return f"{value:0>4d}" + else: + raise CoercionError("expver integer must be between 0 and 9999 inclusive.") + + # Strings accepted if they are convertible to integer or exactly 4 characters long + elif isinstance(value, str): + if value.isdigit(): + int_value = int(value.lstrip("0") or "0") + if 0 <= int_value <= 9999: + return f"{int_value:0>4d}" + else: + raise CoercionError("expver integer string must represent a number between 0 and 9999 inclusive.") + elif len(value) == 4: + return value + else: + raise CoercionError("expver string length must be 4 characters exactly.") + + else: + raise CoercionError("expver must be an integer or a string.") + + coercer = { + "date": coerce_date, + "step": coerce_step, + "number": coerce_number, + "param": coerce_param, + "time": coerce_time, + "expver": coerce_expver, + } diff --git a/polytope_server/common/datasource/datasource.py b/polytope_server/common/datasource/datasource.py index f5a3d80..20da030 100644 --- a/polytope_server/common/datasource/datasource.py +++ b/polytope_server/common/datasource/datasource.py @@ -50,6 +50,10 @@ def match(self, request: str) -> None: """Checks if the request matches the datasource, raises on failure""" raise NotImplementedError() + def repr(self) -> str: + """Returns a string name of the datasource, presented to the user on error""" + raise NotImplementedError + def get_type(self) -> str: """Returns a string stating the type of this object (e.g. fdb, mars, echo)""" raise NotImplementedError() @@ -84,9 +88,7 @@ def dispatch(self, request, input_data) -> bool: if hasattr(self, "silent_match") and self.silent_match: pass else: - request.user_message += "Skipping datasource {} due to match error: {}\n".format( - self.get_type(), repr(e) - ) + request.user_message += "Skipping datasource {}: {}\n".format(self.repr(), str(e)) tb = traceback.format_exception(None, e, e.__traceback__) logging.info(tb) @@ -97,7 +99,7 @@ def dispatch(self, request, input_data) -> bool: datasource_role_rules = self.config.get("roles", None) if datasource_role_rules is not None: if not any(role in request.user.roles for role in datasource_role_rules.get(request.user.realm, [])): - request.user_message += "Skipping datasource {}. User is forbidden.\n".format(self.get_type()) + request.user_message += "Skipping datasource {}: user is not authorised.\n".format(self.repr()) return False # Retrieve/Archive/etc. @@ -111,8 +113,8 @@ def dispatch(self, request, input_data) -> bool: raise NotImplementedError() except NotImplementedError as e: - request.user_message += "Skipping datasource {}. Verb {} not available: {}\n".format( - self.get_type(), request.verb, repr(e) + request.user_message += "Skipping datasource {}: method '{}' not available: {}\n".format( + self.repr(), request.verb, repr(e) ) return False diff --git a/polytope_server/common/datasource/dummy.py b/polytope_server/common/datasource/dummy.py index 682a914..d3dc913 100644 --- a/polytope_server/common/datasource/dummy.py +++ b/polytope_server/common/datasource/dummy.py @@ -43,6 +43,9 @@ def retrieve(self, request): return True + def repr(self): + return self.config.get("repr", "dummy") + def result(self, request): chunk_size = 2 * 1024 * 1024 data_generated = 0 diff --git a/polytope_server/common/datasource/echo.py b/polytope_server/common/datasource/echo.py index 183f023..14093a0 100644 --- a/polytope_server/common/datasource/echo.py +++ b/polytope_server/common/datasource/echo.py @@ -40,6 +40,9 @@ def retrieve(self, request): self.data = request.user_request return True + def repr(self): + return self.config.get("repr", "echo") + def result(self, request): yield self.data diff --git a/polytope_server/common/datasource/fdb.py b/polytope_server/common/datasource/fdb.py index 503be27..78f3f49 100644 --- a/polytope_server/common/datasource/fdb.py +++ b/polytope_server/common/datasource/fdb.py @@ -126,6 +126,9 @@ def retrieve(self, request): self.output = self.fdb.retrieve(r) return True + def repr(self): + return self.config.get("repr", "fdb") + def result(self, request): if not self.output: diff --git a/polytope_server/common/datasource/federated.py b/polytope_server/common/datasource/federated.py index 2304b7b..c4b1cf6 100644 --- a/polytope_server/common/datasource/federated.py +++ b/polytope_server/common/datasource/federated.py @@ -44,6 +44,9 @@ def __init__(self, config): def get_type(self): return self.type + def repr(self): + return self.config.get("repr", "federated") + def archive(self, request): url = "/".join( diff --git a/polytope_server/common/datasource/ionbeam.py b/polytope_server/common/datasource/ionbeam.py index dd5b74f..1ffa928 100644 --- a/polytope_server/common/datasource/ionbeam.py +++ b/polytope_server/common/datasource/ionbeam.py @@ -41,6 +41,9 @@ def get_bytes(self, path: str, **kwargs) -> requests.Response: kwargs["headers"] = kwargs.get("headers", {}) | {"Accept": "application/octet-stream"} return self.get(path, **kwargs) + def repr(self): + return self.config.get("repr", "ionbeam") + def get_json(self, path, **kwargs): return self.get(path, **kwargs).json() diff --git a/polytope_server/common/datasource/mars.py b/polytope_server/common/datasource/mars.py index ed6fd66..f905410 100644 --- a/polytope_server/common/datasource/mars.py +++ b/polytope_server/common/datasource/mars.py @@ -83,6 +83,9 @@ def __init__(self, config): def get_type(self): return self.type + def repr(self): + return self.config.get("repr", "mars") + def match(self, request): r = yaml.safe_load(request.user_request) or {} diff --git a/polytope_server/common/datasource/polytope.py b/polytope_server/common/datasource/polytope.py index 87213d7..7a11608 100644 --- a/polytope_server/common/datasource/polytope.py +++ b/polytope_server/common/datasource/polytope.py @@ -27,7 +27,8 @@ from polytope_feature.utility.exceptions import PolytopeError from polytope_mars.api import PolytopeMars -from . import datasource +from ..schedule import SCHEDULE_READER +from . import coercion, datasource class PolytopeDataSource(datasource.DataSource): @@ -38,6 +39,9 @@ def __init__(self, config): self.match_rules = config.get("match", {}) self.req_single_keys = config.get("options", {}).pop("req_single_keys", []) self.patch_rules = config.get("patch", {}) + self.defaults = config.get("defaults", {}) + self.extra_required_role = config.get("extra_required_role", {}) + self.obey_schedule = config.get("obey_schedule", False) self.output = None # Create a temp file to store gribjump config @@ -55,31 +59,45 @@ def get_type(self): def archive(self, request): raise NotImplementedError() + def check_extra_roles(self, request) -> bool: + + # if the user has any of the extra roles, they are allowed + realm = request.user.realm + req_extra_roles = self.extra_required_role.get(realm, []) + + if len(req_extra_roles) == 0: + return True + + logging.info(f"Checking for user roles in required extra roles: {req_extra_roles}") + logging.info(f"User roles: {request.user.roles}") + + if any(role in req_extra_roles for role in request.user.roles): + return True + else: + return False + def retrieve(self, request): r = yaml.safe_load(request.user_request) + + r = coercion.Coercion.coerce(r) + + r = self.apply_defaults(r) + logging.info(r) # Set the "pre-path" for this request pre_path = {} for k, v in r.items(): + v = v.split("/") if isinstance(v, str) else v if k in self.req_single_keys: if isinstance(v, list): - v = v[0] - pre_path[k] = v + if len(v) == 1: + v = v[0] + pre_path[k] = v polytope_mars_config = copy.deepcopy(self.config) polytope_mars_config["options"]["pre_path"] = pre_path - transforms = [] - for transform in polytope_mars_config["options"]["axis_config"]: - if transform["axis_name"] in r.keys(): - logging.info("Found axis {} in request".format(transform["axis_name"])) - transforms.append(transform) - if transform["axis_name"] in ("latitude", "longitude", "values"): - transforms.append(transform) - - polytope_mars_config["options"]["axis_config"] = transforms - polytope_mars = PolytopeMars( polytope_mars_config, log_context={ @@ -102,22 +120,29 @@ def result(self, request): def match(self, request): + if not self.check_extra_roles(request): + raise Exception("Not authorized to access this data.") + r = yaml.safe_load(request.user_request) or {} + r = coercion.Coercion.coerce(r) + + r = self.apply_defaults(r) + # Check that there is a feature specified in the request if "feature" not in r: - raise Exception("Request does not contain expected key 'feature'") + raise Exception("Request does not contain key 'feature'") for k, v in self.match_rules.items(): # Check that all required keys exist if k not in r: - raise Exception("Request does not contain expected key {}".format(k)) + raise Exception("Request does not contain key {}".format(k)) # ... and check the value of other keys v = [v] if isinstance(v, str) else v if r[k] not in v: - raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v)) + raise Exception("got {}: {}, not one of {}".format(k, r[k], v)) # Check that there is only one value if required for k, v in r.items(): @@ -128,8 +153,41 @@ def match(self, request): elif len(v) == 0: raise Exception("Expected a value for key {}".format(k)) + # Check data released + if SCHEDULE_READER is not None and self.obey_schedule: + # Check if step is in feature + if "step" in r: + step = r["step"] + elif r["feature"]["type"] == "timeseries": + step = r["feature"]["range"]["end"] + elif r["feature"]["type"] == "trajectory" and "step" in r["feature"]["axes"]: + # get index of step in axes, then get max step from trajectory + step = r["feature"]["axes"].index("step") + step = r["feature"]["points"][step].max() + else: + raise PolytopeError("Step not found in request") + SCHEDULE_READER.check_released( + r["date"], + r["class"], + r["stream"], + r.get("domain", "g"), + r["time"], + str(step), + r["type"], + ) + def destroy(self, request) -> None: pass + def repr(self): + return self.config.get("repr", "polytope") + def mime_type(self) -> str: return "application/prs.coverage+json" + + def apply_defaults(self, request): + request = copy.deepcopy(request) + for k, v in self.defaults.items(): + if k not in request: + request[k] = v + return request diff --git a/polytope_server/common/datasource/raise.py b/polytope_server/common/datasource/raise.py index 553e7c3..5f236eb 100644 --- a/polytope_server/common/datasource/raise.py +++ b/polytope_server/common/datasource/raise.py @@ -45,6 +45,9 @@ def result(self, request): def match(self, request): return + def repr(self): + return self.config.get("repr", "raise") + def destroy(self, request) -> None: pass diff --git a/polytope_server/common/datasource/webmars.py b/polytope_server/common/datasource/webmars.py index a246e2a..11725a1 100644 --- a/polytope_server/common/datasource/webmars.py +++ b/polytope_server/common/datasource/webmars.py @@ -88,6 +88,9 @@ def result(self, request): def destroy(self, request) -> None: pass + def repr(self): + return self.config.get("repr", "webmars") + def mime_type(self) -> str: return "application/x-grib" diff --git a/polytope_server/common/logging.py b/polytope_server/common/logging.py index b67c69f..17adcb2 100644 --- a/polytope_server/common/logging.py +++ b/polytope_server/common/logging.py @@ -40,7 +40,7 @@ # Indexable fields INDEXABLE_FIELDS = {"request_id": str} -DEFAULT_LOGGING_MODE = "json" +DEFAULT_LOGGING_MODE = "prettyprint" # Changed from "json" to "prettyprint" or "console" DEFAULT_LOGGING_LEVEL = "INFO" @@ -121,7 +121,9 @@ def format(self, record): if self.mode == "logserver": return self.format_for_logserver(record, result) elif self.mode == "prettyprint": - return json.dumps(result, indent=2) + return json.dumps( + result, indent=2, ensure_ascii=False + ) # Added ensure_ascii=False for correct Unicode display else: return json.dumps(result, indent=None) @@ -140,4 +142,7 @@ def setup(config, source_name): logger.addHandler(handler) logger.setLevel(level) + # Lower the logging level for pymongo + logging.getLogger("pymongo").setLevel(logging.WARNING) + logger.info("Logging Initialized") diff --git a/polytope_server/common/schedule.py b/polytope_server/common/schedule.py new file mode 100644 index 0000000..e629170 --- /dev/null +++ b/polytope_server/common/schedule.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +from __future__ import print_function + +import itertools +import logging +import os +import xml.etree.ElementTree as ET +from datetime import date, datetime, time, timedelta +from typing import Any, Dict, List, Optional, Tuple + +from polytope_feature.utility.exceptions import PolytopeError + +schedule_file_path = os.path.join("/etc/polytope_schedule/schedule.xml") + + +class ScheduleReader: + def __init__(self, schedule_file: str) -> None: + self.products: List[Dict[str, Any]] = self.load_products(schedule_file) + + def load_products(self, schedule_file: str) -> List[Dict[str, Any]]: + tree = ET.parse(schedule_file) + products = tree.findall("product") + mars_only = tree.findall("mars_only") + if mars_only is not None: + for mars in mars_only: + products.extend(mars.findall("product")) + product_dicts = [] + for product in products: + product_dict = {child.tag: child.text for child in product} + product_dicts.append(product_dict) + return product_dicts + + def check_released( + self, date_in: str, cclass: str, stream: str, domain: str, time_in: str, step: str, ttype: str + ) -> None: + """ + Checks if the data is released or not. Accepts arrays and ranges. + + Parameters + ---------- + date_in : str + production date (or range) of the data, + see https://confluence.ecmwf.int/pages/viewpage.action?pageId=118817289 + cclass : string + forecast class, e.g., od | ai | .. + stream : string + data stream, e.g., oper | scda | .. + domain : string + data domain, e.g., g | m | .. + time_in : string + production time of the data, i.e., 00:00 | 06:00 | 12:00 | 18:00 + step : string + data time step, e.g., 0 | 1 | .. | 360 | .. + ttype : string + data type, e.g., fc | an | .. + + Returns + ------- + None + + Raises + ------ + PolytopeError + If the data is not released yet. + """ + # Get only latest production date and time, last step + date_in = datetime.strftime(max(map(parse_mars_date, split_mars_param(date_in))), "%Y-%m-%d") + time_in = max(map(parse_mars_time, split_mars_param(time_in))).strftime("%H:%M") + step = str(max(map(int, split_mars_param(step)))).zfill(4) + + cclass = split_mars_param(cclass) + stream = split_mars_param(stream) + domain = split_mars_param(domain) + ttype = split_mars_param(ttype) + + for c, s, dom, diss in itertools.product(cclass, stream, domain, ttype): + release_time, delta_day = self.get_release_time_and_delta_day(c, s, dom, time_in, step, diss) + if release_time is None: + raise PolytopeError( + f"No release time found for date: {date_in}, class: {c}, stream: {s}, " + f"domain: {dom}, time: {time_in}, step: {step}, type {diss}" + ) + + release_time_dt = datetime.strptime(release_time, "%H:%M:%S") + release_date = datetime.fromisoformat(date_in) + timedelta(days=delta_day) + release_date = release_date.replace( + hour=release_time_dt.hour, minute=release_time_dt.minute, second=release_time_dt.second + ) + if datetime.now() < release_date: + raise PolytopeError( + f"Data not released yet. Release time is {release_date}." + # f"Data not yet released for date: {date_in}, class: {c}, stream: {s}, " + # f"domain: {dom}, time: {time_in}, step: {step}, type {diss}. " + # f"Release time is {release_date}." + ) + + def get_release_time_and_delta_day( + self, cclass: str, stream: str, domain: str, time_in: str, step: str, ttype: str + ) -> Tuple[Optional[str], Optional[int]]: + """ + Retrieves dissemination time from the schedule for respective stream etc. + DOES NOT ACCEPT ARRAYS OR RANGES. + + Adapted from ecmwf/pgen/src/scripts/pgen-crack-schedule + + Parameters + ---------- + cclass : string + forecast class, e.g., od | ai | .. + stream : string + data stream, e.g., oper | scda | .. + domain : string + data domain, e.g., g | m | .. + time_in : string + production time of the data, i.e., 00:00 | 06:00 | 12:00 | 18:00 + step : string + data time step, e.g., 0 | 1 | .. | 360 | .. + ttype : string + data type, e.g., fc | an | .. + + Returns + ------- + release_time: string + time of release (hh:mm:ss) + delta_day: int + number of days to add to the production date + """ + + def matches_criteria(product: Dict[str, Any]) -> bool: + if product.get("class") != cclass: + return False + if stream.lower() not in product["stream"].lower(): + return False + if time_in != product.get("time"): + return False + prod_domain = find_tag(product, "domain") + if prod_domain: + if domain.lower() != find_tag(product, "domain"): + return False + prod_type = find_tag(product, "type") + if prod_type: + if ttype.lower() not in find_tag(product, "type"): + return False + if cclass == "ai": + return True + + prod_step = find_tag(product, "step") + if prod_step: + istep = int(prod_step) if prod_step is not None else prod_step + if istep != int(step): + return False + return True + + for product in self.products: + if matches_criteria(product): + release_time = product.get("release_time") + delta_day = int(product.get("release_delta_day", 0)) + logging.info( + "release time: {} with delta_day: {} found for class: {}, stream: {}, type: {}, " + "domain: {}, time: {}, step: {}".format( + release_time, delta_day, cclass, stream, ttype, domain, time_in, step + ) + ) + return release_time, delta_day + + logging.warning( + "No release time found for class{}, stream: {}, type: {}, domain: {}, time: {}, step: {}".format( + cclass, stream, ttype, domain, time_in, step + ) + ) + return None, None + + +def parse_mars_date(mars_date: str) -> date: + """ + Parse a MARS date string into a datetime object. + Valid formats are: + + Absolute as YYYY-MM-DD, YYYYMMDD. The day of the year can also be used: YYYY-DDD + Relative as -n ; n is the number of days before today (i.e., -1 = yesterday ) + Name of month (e.g. January for Climatology data) + Operational monthly means are retrieved by setting day (DD) to 00. + See https://confluence.ecmwf.int/pages/viewpage.action?pageId=118817289 for more information. + + Parameters + ---------- + date : str + The date string to parse. + + Returns + ------- + date + The parsed date object. + """ + try: + return date.fromisoformat(mars_date) + except ValueError: + try: + delta = int(mars_date) + if delta > 0: + raise PolytopeError(f"Invalid date format: {mars_date}") + return date.today() - timedelta(days=-delta) + except ValueError: + raise PolytopeError(f"Invalid date format: {mars_date}") + + +def parse_mars_time(mars_time: str) -> time: + """ + Parse a MARS time string into a time object. + Valid formats are: %H, %H%M, %H:%M + + Parameters + ---------- + mars_time : str + The time string to parse. + + Returns + ------- + time + The parsed time object. + """ + time_formats = ["%H", "%H%M", "%H:%M"] + for time_format in time_formats: + try: + return datetime.strptime(mars_time, time_format).time() + except ValueError: + continue + raise ValueError(f"Invalid time format: {mars_time}") + + +def split_mars_param(param: str) -> List[str]: + """ + Parse a MARS parameter string into an array if it is + one or get the last element if it's a range. + + Parameters + ---------- + param : str + The parameter string to parse. + + Returns + ------- + List[str] + The split parameter string + """ + parts = param.split("/") + if "by" in parts: + return parts[-3] + if "to" in parts: + return parts[-1] + return parts + + +def find_tag(product: Dict[str, Any], keyword: str) -> Optional[str]: + """ + Utility function to find a tag in the product dictionary, + checking for both 'diss_{keyword}' and '{keyword}' tags. Used with "step" and "domain" tags. + + Parameters + ---------- + product : Dict[str, Any] + The product dictionary to search within. + keyword : str + The tag to search for. + + Returns + ------- + Optional[str] + The text of the tag if found, otherwise None. + """ + tag = product.get(keyword) + if tag is None: + tag = product.get(f"diss_{keyword}") + return tag + + +if os.environ.get("SCHEDULE_ENABLED", "false").lower() == "true": + if os.path.exists(schedule_file_path): + SCHEDULE_READER = ScheduleReader(schedule_file_path) + else: + raise FileNotFoundError(f"Schedule is enabled, but schedule file not found at {schedule_file_path}") diff --git a/polytope_server/common/schedule_products.py b/polytope_server/common/schedule_products.py new file mode 100644 index 0000000..e69de29 diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 42a352f..c2b428c 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -109,9 +109,12 @@ def __init__(self, config): logging.info(f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}") def create(self, name, data, content_type): - name = name + ".grib" - # fix for seaweedfs auto-setting Content-Disposition to inline and earthkit expecting extension, - # else using content-disposition header + + type_extension_map = {"application/x-grib": "grib", "application/prs.coverage+json": "covjson"} + + # seaweedfs does not store content-type, so we need to use an extension to communicate mime-type + name = name + "." + type_extension_map.get(content_type, ".bin") + try: multipart_upload = self.s3_client.create_multipart_upload( Bucket=self.bucket, diff --git a/polytope_server/version.py b/polytope_server/version.py index 07e880e..02139a3 100644 --- a/polytope_server/version.py +++ b/polytope_server/version.py @@ -20,4 +20,4 @@ # Single-source Polytope version number -__version__ = "0.8.3" +__version__ = "0.8.7" diff --git a/polytope_server/worker/worker.py b/polytope_server/worker/worker.py index 9a21d50..f365119 100644 --- a/polytope_server/worker/worker.py +++ b/polytope_server/worker/worker.py @@ -256,9 +256,9 @@ def process_request(self, request): datasource.destroy(request) if datasource is None: - request.user_message += "Failed to process request." + # request.user_message += "Failed to process request." logging.info(request.user_message, extra={"request_id": id}) - raise Exception("Failed to process request.") + raise Exception("Request was not accepted by any datasources.") else: request.user_message += "Success" diff --git a/requirements.txt b/requirements.txt index d032df2..92730b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ attrdict==2.0.1 boto3==1.35.19 -covjsonkit==0.0.24 +covjsonkit==0.0.28 deepmerge==2.0 docker==7.1.0 ecmwf-api-client==1.6.3 @@ -20,8 +20,8 @@ Markdown==3.7 minio==7.2.8 moto[dynamodb]==5.0.16 pika==1.3.2 -polytope-mars==0.0.12 -polytope-python==1.0.8 +polytope-mars==0.1.6 +polytope-python==1.0.14 py-bcrypt==0.4 pykwalify==1.8.0 pymemcache==4.0.0 @@ -32,4 +32,4 @@ python-keycloak==4.4.0 PyYAML==6.0.2 redis==5.0.8 requests==2.32.3 -Werkzeug==3.0.4 +Werkzeug==3.0.4 \ No newline at end of file diff --git a/skaffold.yaml b/skaffold.yaml index 1b4e1fb..f0b762b 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -40,12 +40,12 @@ build: mars_config_branch: "{{ .mars_config_branch }}" developer_mode: "{{ .developer_mode }}" # mars_client_c_version: 6.33.20.2 - gribjump_version: 0.6.2 + gribjump_version: 0.7.0 # mars_client_cpp_version: 6.99.3.0 # ecbuild_version: 3.8.2 # eccodes_version: 2.33.1 # eckit_version: 1.28.0 - # pyfdb_version: 0.0.3 + # pyfdb_version: 0.1.0 # fdb_version: 5.13.2 mars_base_c: mars-base-c mars_base_cpp: mars-base-cpp diff --git a/tests/unit/test_polytope_datasource_coercion.py b/tests/unit/test_polytope_datasource_coercion.py new file mode 100644 index 0000000..159d91f --- /dev/null +++ b/tests/unit/test_polytope_datasource_coercion.py @@ -0,0 +1,230 @@ +import pytest + +from polytope_server.common.datasource.coercion import Coercion, CoercionError + + +def test_coerce(): + + # mars-like + request_mars = { + "class": "od", + "stream": "enfo", + "type": "pf", + "date": "2024-11-14", + "time": 12, + "levtype": "sfc", + "expver": 1, + "domain": "g", + "param": "164/166/167/169", + "number": "1/2", + "step": "0/to/360/by/6", + "feature": { # dict ignored + "foo": "bar", + }, + } + + # json-like + request_json = { + "class": "od", + "stream": ["enfo"], + "type": "pf", + "date": "2024-11-14", + "time": 12, + "levtype": "sfc", + "expver": [1], + "domain": "g", + "param": [164, 166, 167, "169"], + "number": "1/2", + "step": "0/to/360/by/6", + "feature": { # dict ignored + "foo": "bar", + }, + } + + request_out = { + "class": "od", + "stream": "enfo", + "type": "pf", + "date": "20241114", + "time": "1200", + "levtype": "sfc", + "expver": "0001", + "domain": "g", + "param": "164/166/167/169", + "number": "1/2", + "step": "0/to/360/by/6", + "feature": { # dict ignored + "foo": "bar", + }, + } + + assert Coercion.coerce(request_mars) == request_out + assert Coercion.coerce(request_json) == request_out + + +def test_date_coercion(): + + from datetime import datetime, timedelta + + today = datetime.today() + yyyymmdd = today.strftime("%Y%m%d") + yyyy_mm_dd = today.strftime("%Y-%m-%d") + yesterday = (today + timedelta(days=-1)).strftime("%Y%m%d") + today = today.strftime("%Y%m%d") + + ok = [ + (20241114, "20241114"), + ("20241114", "20241114"), + ("2024-11-14", "20241114"), + (int(yyyymmdd), yyyymmdd), + (yyyymmdd, yyyymmdd), + (yyyy_mm_dd, yyyymmdd), + (-1, yesterday), + (0, today), + ("-1", yesterday), + ("0", today), + ] + + fail = [ + "2024-11-14T00:00:00", + 202401, + 2024010, + 1.0, + [], + {}, + ] + + for value, expected in ok: + result = Coercion.coerce_date(value) + assert result == expected + + for value in fail: + with pytest.raises(CoercionError): + Coercion.coerce_date(value) + + +def test_step_coercion(): + + # Should accept integer or string, converted to string + ok = [ + (2, "2"), + ("1", "1"), + (10, "10"), + (0, "0"), + ("0", "0"), + ] + + fail = [-1, 1.0, [], {}] + + for value, expected in ok: + result = Coercion.coerce_step(value) + assert result == expected + + for value in fail: + with pytest.raises(CoercionError): + Coercion.coerce_step(value) + + +def test_number_coercion(): + + # Should accept integer or string, converted to string + ok = [(2, "2"), ("1", "1"), (10, "10")] + + fail = [-1, 0, 1.0, [], {}] + + for value, expected in ok: + result = Coercion.coerce_number(value) + assert result == expected + + for value in fail: + with pytest.raises(CoercionError): + Coercion.coerce_number(value) + + +def test_param_coercion(): + + # OK, but should be converted + ok = [ + (100, "100"), + ("100", "100"), + ("100.200", "100.200"), + ("2t", "2t"), + ] + fail = [[], {}, 1.0] + + for value, expected in ok: + result = Coercion.coerce_param(value) + assert result == expected + + for value in fail: + with pytest.raises(CoercionError): + Coercion.coerce_param(value) + + +def test_time_coercion(): + + # OK, but should be converted + ok = [ + ("1200", "1200"), + ("12", "1200"), + ("1", "0100"), + ("6", "0600"), + ("12:00", "1200"), + (0, "0000"), + (12, "1200"), + (1200, "1200"), + ] + fail = [ + "abc", + 25, + 2400, + 2401, + -1, + -10, + [], + {}, + ] + + for value, expected in ok: + result = Coercion.coerce_time(value) + assert result == expected + + for value in fail: + with pytest.raises(CoercionError): + Coercion.coerce_time(value) + + +def test_expver_coercion(): + expvers = [ + "0001", + "001", + "01", + "1", + 1, + ] + + for expver in expvers: + result = Coercion.coerce_expver(expver) + assert result == "0001" + + assert Coercion.coerce_expver("abcd") == "abcd" + assert Coercion.coerce_expver(10) == "0010" + assert Coercion.coerce_expver("1abc") == "1abc" + + with pytest.raises(CoercionError): + Coercion.coerce_expver("abcde") # too long + + with pytest.raises(CoercionError): + Coercion.coerce_expver("abc") # too short + + with pytest.raises(CoercionError): + Coercion.coerce_expver(1.0) # float + + with pytest.raises(CoercionError): + Coercion.coerce_expver([]) + + with pytest.raises(CoercionError): + Coercion.coerce_expver({}) + + with pytest.raises(CoercionError): + Coercion.coerce_expver(["a", "b", "c", "d"]) diff --git a/tests/unit/test_schedule.py b/tests/unit/test_schedule.py new file mode 100644 index 0000000..2889d9d --- /dev/null +++ b/tests/unit/test_schedule.py @@ -0,0 +1,141 @@ +from datetime import date, time, timedelta + +import pytest +from polytope_feature.utility.exceptions import PolytopeError + +from polytope_server.common.schedule import ( + ScheduleReader, + parse_mars_date, + parse_mars_time, + split_mars_param, +) + + +@pytest.fixture +def mock_schedule_file(tmp_path): + # Create mock XML data + xml_data = """ + + + od + oper/wave + g + + 0000 + an + 05:35:00 + 1 + + + + od + enfo/waef + + 06:40:00 + 1 + + + + ai + oper + g + + 0000 + fc + 08:34:00 + 0 + + + """ + schedule_file = tmp_path / "schedule.xml" + schedule_file.write_text(xml_data) + return schedule_file + + +def test_get_release_time_and_delta_day_match(mock_schedule_file): + reader = ScheduleReader(str(mock_schedule_file)) + release_time, delta_day = reader.get_release_time_and_delta_day( + cclass="od", stream="oper/wave", domain="g", time_in="00:00", step="0000", ttype="an" + ) + assert release_time == "05:35:00" + assert delta_day == 1 + release_time, delta_day = reader.get_release_time_and_delta_day( + cclass="od", stream="enfo", domain="g", time_in="00:00", step="0360", ttype="pf" + ) + assert release_time == "06:40:00" + assert delta_day == 1 + + +def test_get_release_time_and_delta_day_no_match(mock_schedule_file): + reader = ScheduleReader(str(mock_schedule_file)) + release_time, delta_day = reader.get_release_time_and_delta_day( + cclass="od", stream="nonexistent", domain="g", time_in="00:00", step="0000", ttype="an" + ) + assert release_time is None + assert delta_day is None + + +def test_check_released(mock_schedule_file): + reader = ScheduleReader(str(mock_schedule_file)) + date_in = "2023-10-01" + cclass = "od" + stream = "oper/wave" + domain = "g" + time_in = "00:00" + step = "0000" + diss_type = "an" + reader.check_released(date_in, cclass, stream, domain, time_in, step, diss_type) + + stream = "enfo" + step = "0100" + reader.check_released(date_in, cclass, stream, domain, time_in, step, diss_type) + + with pytest.raises(Exception): + reader.check_released(date_in, "od", "nonexistent", "g", "00:00", "0000", "an") + + +def test_split_mars_param(): + assert split_mars_param("0/1/2") == ["0", "1", "2"] + assert split_mars_param("0/to/2") == "2" + assert split_mars_param("0/to/7/by/2") == "7" + + +def test_parse_mars_date(): + assert parse_mars_date("2023-10-01") == date(2023, 10, 1) + assert parse_mars_date("20231001") == date(2023, 10, 1) + assert parse_mars_date("0") == date.today() + assert parse_mars_date("-1") == date.today() - timedelta(days=1) + pytest.raises(PolytopeError, parse_mars_date, "1") + pytest.raises(PolytopeError, parse_mars_date, "2023274") + pytest.raises(PolytopeError, parse_mars_date, "January") + + +def test_parse_mars_time(): + assert parse_mars_time("1230") == time(12, 30) + assert parse_mars_time("0000") == time(0, 0) + assert parse_mars_time("12:30") == time(12, 30) + assert parse_mars_time("12") == time(12, 0) + + with pytest.raises(ValueError): + parse_mars_time("invalid_time") + + with pytest.raises(ValueError): + parse_mars_time("25:00") + + with pytest.raises(ValueError): + parse_mars_time("123456") + + +# needs to schedule.xml file included + +# from polytope_server.common.schedule import SCHEDULE_READER + +# def test_integration_test(): +# date_in = "20241113" +# cclass = "od" +# stream = "enfo" +# domain = "g" +# time_in = "00:00" +# step = "0360" +# diss_type = "cf" +# SCHEDULE_READER.check_released(date_in, cclass, stream, domain, time_in, step, diss_type)