|
1 | | -"""Engines for storing the queues on disk.""" |
| 1 | +"""Engines for storing the queues with DBM.""" |
2 | 2 |
|
3 | 3 | import dbm |
4 | 4 | import logging |
5 | 5 | import pickle |
6 | 6 | from pathlib import Path |
7 | 7 | from typing import Any, List, Optional, Union |
8 | 8 |
|
9 | | -try: |
10 | | - import aiodbm |
11 | | -except ImportError: |
12 | | - has_aiodbm = False |
13 | | -else: |
14 | | - has_aiodbm = True |
| 9 | +import aiodbm |
15 | 10 |
|
16 | | -from .base import _FifoStorageEngine |
| 11 | +from .base import FifoStorageEngine |
17 | 12 |
|
18 | 13 | logger = logging.getLogger("aiodiskqueue") |
19 | 14 |
|
20 | | -if has_aiodbm: |
21 | | - |
22 | | - class DbmEngine(_FifoStorageEngine): |
23 | | - """A queue storage engine using DBM.""" |
24 | | - |
25 | | - def __init__(self, data_path: Path) -> None: |
26 | | - super().__init__(data_path) |
27 | | - self._data_path_2 = str(data_path.absolute()) |
28 | | - |
29 | | - HEAD_ID_KEY = "head_id" |
30 | | - TAIL_ID_KEY = "tail_id" |
31 | | - |
32 | | - async def initialize(self): |
33 | | - async with aiodbm.open(self._data_path_2, "c") as db: |
34 | | - await db.set("dummy", "test") |
35 | | - await db.delete("dummy") |
36 | | - |
37 | | - async def fetch_all(self) -> List[Any]: |
38 | | - try: |
39 | | - async with aiodbm.open(self._data_path_2, "r") as db: |
40 | | - head_id = await self._get_obj(db, self.HEAD_ID_KEY) |
41 | | - tail_id = await self._get_obj(db, self.TAIL_ID_KEY) |
42 | | - if not head_id or not tail_id: |
43 | | - return [] |
44 | | - |
45 | | - items = [] |
46 | | - for item_id in range(head_id, tail_id + 1): |
47 | | - item_key = self._make_item_key(item_id) |
48 | | - item = await self._get_obj(db, item_key) |
49 | | - items.append(item) |
50 | | - except dbm.error: |
51 | | - items = [] |
52 | 15 |
|
53 | | - return items |
| 16 | +class DbmEngine(FifoStorageEngine): |
| 17 | + """A queue storage engine using DBM.""" |
54 | 18 |
|
55 | | - async def add_item(self, item: Any): |
56 | | - async with aiodbm.open(self._data_path_2, "w") as db: |
57 | | - tail_id = await self._get_obj(db, self.TAIL_ID_KEY) |
58 | | - if tail_id: |
59 | | - item_id = tail_id + 1 |
60 | | - is_first = False |
61 | | - else: |
62 | | - item_id = 1 |
63 | | - is_first = True |
| 19 | + def __init__(self, data_path: Path) -> None: |
| 20 | + super().__init__(data_path) |
| 21 | + self._data_path_2 = str(data_path.absolute()) |
64 | 22 |
|
65 | | - await self._set_obj(db, self._make_item_key(item_id), item) |
66 | | - await self._set_obj(db, self.TAIL_ID_KEY, item_id) |
| 23 | + _HEAD_ID_KEY = "head_id" |
| 24 | + _TAIL_ID_KEY = "tail_id" |
67 | 25 |
|
68 | | - if is_first: |
69 | | - await self._set_obj(db, self.HEAD_ID_KEY, item_id) |
| 26 | + async def initialize(self): |
| 27 | + async with aiodbm.open(self._data_path_2, "c") as db: |
| 28 | + await db.set("dummy", "test") |
| 29 | + await db.delete("dummy") |
70 | 30 |
|
71 | | - async def remove_item(self): |
72 | | - async with aiodbm.open(self._data_path_2, "w") as db: |
73 | | - head_id = await self._get_obj(db, self.HEAD_ID_KEY) |
74 | | - tail_id = await self._get_obj(db, self.TAIL_ID_KEY) |
| 31 | + async def fetch_all(self) -> List[Any]: |
| 32 | + try: |
| 33 | + async with aiodbm.open(self._data_path_2, "r") as db: |
| 34 | + head_id = await self._get_obj(db, self._HEAD_ID_KEY) |
| 35 | + tail_id = await self._get_obj(db, self._TAIL_ID_KEY) |
75 | 36 | if not head_id or not tail_id: |
76 | | - raise ValueError("Nothing to remove from an empty database") |
77 | | - item_key = self._make_item_key(head_id) |
78 | | - await db.delete(item_key) |
79 | | - |
80 | | - if head_id != tail_id: |
81 | | - # there are items left |
82 | | - await self._set_obj(db, self.HEAD_ID_KEY, head_id + 1) |
83 | | - else: |
84 | | - # was last item |
85 | | - await db.delete(self.HEAD_ID_KEY) |
86 | | - await db.delete(self.TAIL_ID_KEY) |
87 | | - |
88 | | - @staticmethod |
89 | | - def _make_item_key(item_id: int) -> str: |
90 | | - return f"item-{item_id}" |
91 | | - |
92 | | - @staticmethod |
93 | | - async def _get_obj(db, key: Union[str, bytes]) -> Optional[Any]: |
94 | | - data = await db.get(key) |
95 | | - if not data: |
96 | | - return None |
97 | | - return pickle.loads(data) |
98 | | - |
99 | | - @staticmethod |
100 | | - async def _set_obj(db, key: Union[str, bytes], item: Any): |
101 | | - data = pickle.dumps(item) |
102 | | - await db.set(key, data) |
| 37 | + return [] |
| 38 | + |
| 39 | + items = [] |
| 40 | + for item_id in range(head_id, tail_id + 1): |
| 41 | + item_key = self._make_item_key(item_id) |
| 42 | + item = await self._get_obj(db, item_key) |
| 43 | + items.append(item) |
| 44 | + except dbm.error: |
| 45 | + items = [] |
| 46 | + |
| 47 | + return items |
| 48 | + |
| 49 | + async def add_item(self, item: Any): |
| 50 | + async with aiodbm.open(self._data_path_2, "w") as db: |
| 51 | + tail_id = await self._get_obj(db, self._TAIL_ID_KEY) |
| 52 | + if tail_id: |
| 53 | + item_id = tail_id + 1 |
| 54 | + is_first = False |
| 55 | + else: |
| 56 | + item_id = 1 |
| 57 | + is_first = True |
| 58 | + |
| 59 | + await self._set_obj(db, self._make_item_key(item_id), item) |
| 60 | + await self._set_obj(db, self._TAIL_ID_KEY, item_id) |
| 61 | + |
| 62 | + if is_first: |
| 63 | + await self._set_obj(db, self._HEAD_ID_KEY, item_id) |
| 64 | + |
| 65 | + async def remove_item(self): |
| 66 | + async with aiodbm.open(self._data_path_2, "w") as db: |
| 67 | + head_id = await self._get_obj(db, self._HEAD_ID_KEY) |
| 68 | + tail_id = await self._get_obj(db, self._TAIL_ID_KEY) |
| 69 | + if not head_id or not tail_id: |
| 70 | + raise ValueError("Nothing to remove from an empty database") |
| 71 | + item_key = self._make_item_key(head_id) |
| 72 | + await db.delete(item_key) |
| 73 | + |
| 74 | + if head_id != tail_id: |
| 75 | + # there are items left |
| 76 | + await self._set_obj(db, self._HEAD_ID_KEY, head_id + 1) |
| 77 | + else: |
| 78 | + # was last item |
| 79 | + await db.delete(self._HEAD_ID_KEY) |
| 80 | + await db.delete(self._TAIL_ID_KEY) |
| 81 | + |
| 82 | + @staticmethod |
| 83 | + def _make_item_key(item_id: int) -> str: |
| 84 | + return f"item-{item_id}" |
| 85 | + |
| 86 | + @staticmethod |
| 87 | + async def _get_obj(db, key: Union[str, bytes]) -> Optional[Any]: |
| 88 | + data = await db.get(key) |
| 89 | + if not data: |
| 90 | + return None |
| 91 | + return pickle.loads(data) |
| 92 | + |
| 93 | + @staticmethod |
| 94 | + async def _set_obj(db, key: Union[str, bytes], item: Any): |
| 95 | + data = pickle.dumps(item) |
| 96 | + await db.set(key, data) |
0 commit comments