|
| 1 | +# Copyright 2025 ACSONE SA/NV |
| 2 | +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). |
| 3 | +import logging |
| 4 | +import queue |
| 5 | +import threading |
| 6 | +from collections import defaultdict |
| 7 | +from collections.abc import Generator |
| 8 | +from contextlib import contextmanager |
| 9 | + |
| 10 | +from odoo.api import Environment |
| 11 | + |
| 12 | +from fastapi import FastAPI |
| 13 | + |
| 14 | +_logger = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +class FastApiAppPool: |
| 18 | + """Pool of FastAPI apps. |
| 19 | +
|
| 20 | + This class manages a pool of FastAPI apps. The pool is organized by database name |
| 21 | + and root path. Each pool is a queue of FastAPI apps. |
| 22 | +
|
| 23 | + The pool is used to reuse FastAPI apps across multiple requests. This is useful |
| 24 | + to avoid the overhead of creating a new FastAPI app for each request. The pool |
| 25 | + ensures that only one request at a time uses an app. |
| 26 | +
|
| 27 | + The proper way to use the pool is to use the get_app method as a context manager. |
| 28 | + This ensures that the app is returned to the pool after the context manager exits. |
| 29 | + The get_app method is designed to ensure that the app made available to the |
| 30 | + caller is unique and not used by another caller at the same time. |
| 31 | +
|
| 32 | + .. code-block:: python |
| 33 | +
|
| 34 | + with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app: |
| 35 | + # use the app |
| 36 | +
|
| 37 | + The pool is invalidated when the cache registry is updated. This ensures that |
| 38 | + the pool is always up-to-date with the latest app configuration. It also |
| 39 | + ensures that the invalidation is done even in the case of a modification occurring |
| 40 | + in a different worker process or thread or server instance. This mechanism |
| 41 | + works because every time an attribute of the fastapi.endpoint model is modified |
| 42 | + and this attribute is part of the list returned by the `_fastapi_app_fields`, |
| 43 | + or `_routing_impacting_fields` methods, we reset the cache of a marker method |
| 44 | + `_reset_app_cache_marker`. As side effect, the cache registry is marked to be |
| 45 | + updated by the increment of the `cache_sequence` SQL sequence. This cache sequence |
| 46 | + on the registry is reloaded from the DB on each request made to a specific database. |
| 47 | + When an app is retrieved from the pool, we always compare the cache sequence of |
| 48 | + the pool with the cache sequence of the registry. If the two sequences are |
| 49 | + different, we invalidate the pool and save the new cache sequence on the pool. |
| 50 | +
|
| 51 | + The cache is based on a defaultdict of defaultdict of queue.Queue. We are cautious |
| 52 | + that the use of defaultdict is not thread-safe for operations that modify the |
| 53 | + dictionary. However the only operation that modifies the dictionary is the |
| 54 | + first access to a new key. If two threads access the same key at the same time, |
| 55 | + the two threads will create two different queues. This is not a problem since |
| 56 | + at the time of returning an app to the pool, we are sure that a queue exists |
| 57 | + for the key into the cache and all the created apps are returned to the same |
| 58 | + valid queue. And the end, the lack of thread-safety for the defaultdict could |
| 59 | + only lead to a negligible overhead of creating a new queue that will never be |
| 60 | + used. This is why we consider that the use of defaultdict is safe in this context. |
| 61 | + """ |
| 62 | + |
| 63 | + def __init__(self): |
| 64 | + self._queue_by_db_by_root_path: dict[str, dict[str, queue.Queue[FastAPI]]] = ( |
| 65 | + defaultdict(lambda: defaultdict(queue.Queue)) |
| 66 | + ) |
| 67 | + self.__cache_sequences = {} |
| 68 | + self._lock = threading.Lock() |
| 69 | + |
| 70 | + def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]: |
| 71 | + db_name = env.cr.dbname |
| 72 | + return self._queue_by_db_by_root_path[db_name][root_path] |
| 73 | + |
| 74 | + def __get_app(self, env: Environment, root_path: str) -> FastAPI: |
| 75 | + pool = self.__get_pool(env, root_path) |
| 76 | + try: |
| 77 | + return pool.get_nowait() |
| 78 | + except queue.Empty: |
| 79 | + return env["fastapi.endpoint"].sudo().get_app(root_path) |
| 80 | + |
| 81 | + def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None: |
| 82 | + pool = self.__get_pool(env, root_path) |
| 83 | + pool.put(app) |
| 84 | + |
| 85 | + @contextmanager |
| 86 | + def get_app( |
| 87 | + self, env: Environment, root_path: str |
| 88 | + ) -> Generator[FastAPI, None, None]: |
| 89 | + """Return a FastAPI app to be used in a context manager. |
| 90 | +
|
| 91 | + The app is retrieved from the pool if available, otherwise a new one is created. |
| 92 | + The app is returned to the pool after the context manager exits. |
| 93 | +
|
| 94 | + When used into the FastApiDispatcher class this ensures that the app is reused |
| 95 | + across multiple requests but only one request at a time uses an app. |
| 96 | + """ |
| 97 | + self._check_cache(env) |
| 98 | + app = self.__get_app(env, root_path) |
| 99 | + try: |
| 100 | + yield app |
| 101 | + finally: |
| 102 | + self.__return_app(env, app, root_path) |
| 103 | + |
| 104 | + def get_cache_sequence(self, key: str) -> int: |
| 105 | + with self._lock: |
| 106 | + return self.__cache_sequences.get(key, 0) |
| 107 | + |
| 108 | + def set_cache_sequence(self, key: str, value: int) -> None: |
| 109 | + with self._lock: |
| 110 | + if ( |
| 111 | + key not in self.__cache_sequences |
| 112 | + or value != self.__cache_sequences[key] |
| 113 | + ): |
| 114 | + self.__cache_sequences[key] = value |
| 115 | + |
| 116 | + def _check_cache(self, env: Environment) -> None: |
| 117 | + cache_sequences = env.registry.cache_sequences |
| 118 | + for key, value in cache_sequences.items(): |
| 119 | + if ( |
| 120 | + value != self.get_cache_sequence(key) |
| 121 | + and self.get_cache_sequence(key) != 0 |
| 122 | + ): |
| 123 | + _logger.info( |
| 124 | + "Cache registry updated, reset fastapi_app pool for the current " |
| 125 | + "database" |
| 126 | + ) |
| 127 | + self.invalidate(env) |
| 128 | + self.set_cache_sequence(key, value) |
| 129 | + |
| 130 | + def invalidate(self, env: Environment, root_path: str | None = None) -> None: |
| 131 | + db_name = env.cr.dbname |
| 132 | + if root_path: |
| 133 | + self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() |
| 134 | + elif db_name in self._queue_by_db_by_root_path: |
| 135 | + del self._queue_by_db_by_root_path[db_name] |
0 commit comments