Skip to content

Commit f7cff5b

Browse files
committed
add SharedBytes to morebuiltins.shared_memory
1 parent facc89b commit f7cff5b

File tree

2 files changed

+115
-11
lines changed

2 files changed

+115
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
### 1.3.1 (2025-06-19)
22
1. add `check_recursion` to `morebuiltins.funcs`, to check if a function is recursive.
3+
2. add `SharedBytes` to `morebuiltins.shared_memory`, a shared memory for bytes, similar to `multiprocessing.shared_memory.SharedMemory` with a header for size.
34

45
### 1.3.0 (2025-03-08)
56
1. **Compatibility Warning**. rename `morebuiltins.functools` to `morebuiltins.funcs` to avoid conflict with `functools` in python standard library.

morebuiltins/shared_memory.py

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import atexit
22
import os
33
import time
4-
from multiprocessing import shared_memory
4+
import typing
5+
from contextlib import closing
56

7+
from multiprocessing.shared_memory import SharedMemory
68

7-
__all__ = [
8-
"PLock",
9-
]
9+
__all__ = ["PLock", "SharedBytes"]
1010

1111

1212
class PLock:
@@ -56,14 +56,16 @@ def __init__(self, name: str, force=False, close_atexit=False, pid=None):
5656
if close_atexit:
5757
atexit.register(self.close)
5858
self.init()
59+
if self.shm is None:
60+
raise RuntimeError(f"Failed to create shared memory {name}")
5961

6062
@staticmethod
6163
def wait_for_free(name: str, timeout=3, interval=0.1):
6264
"""Wait for the shared memory to be free."""
6365
start = time.time()
6466
while time.time() - start < timeout:
6567
try:
66-
shared_memory.SharedMemory(name=name).close()
68+
SharedMemory(name=name).close()
6769
time.sleep(interval)
6870
except FileNotFoundError:
6971
return True
@@ -74,11 +76,9 @@ def init(self):
7476
raise RuntimeError("Already closed")
7577
ok = True
7678
try:
77-
self.shm = shared_memory.SharedMemory(
78-
name=self.name, create=True, size=self.DEFAULT_SIZE
79-
)
79+
self.shm = SharedMemory(name=self.name, create=True, size=self.DEFAULT_SIZE)
8080
except FileExistsError:
81-
self.shm = shared_memory.SharedMemory(name=self.name)
81+
self.shm = SharedMemory(name=self.name)
8282
if not self.force:
8383
ok = False
8484
if not ok:
@@ -96,13 +96,19 @@ def init(self):
9696
def set_mem_pid(self, pid=None):
9797
if pid is None:
9898
pid = self.pid
99-
self.shm.buf[: self.DEFAULT_SIZE] = pid.to_bytes(
99+
self.buf[: self.DEFAULT_SIZE] = pid.to_bytes(
100100
self.DEFAULT_SIZE, byteorder=self.DEFAULT_BYTEORDER
101101
)
102102

103+
@property
104+
def buf(self):
105+
if self.shm is None:
106+
raise RuntimeError("Shared memory is not initialized")
107+
return self.shm.buf
108+
103109
def get_mem_pid(self):
104110
return int.from_bytes(
105-
self.shm.buf[: self.DEFAULT_SIZE], byteorder=self.DEFAULT_BYTEORDER
111+
self.buf[: self.DEFAULT_SIZE], byteorder=self.DEFAULT_BYTEORDER
106112
)
107113

108114
@property
@@ -137,6 +143,103 @@ def __del__(self):
137143
self.close()
138144

139145

146+
class SharedBytes:
147+
"""Shared Memory for Python, for python 3.8+.
148+
This module provides a simple way to create and manage shared memory segments, shared between different processes.
149+
Shared memory is faster than other IPC methods like pipes or queues, and it allows for direct access to the memory.
150+
151+
Demo:
152+
153+
>>> sb = SharedBytes(name="test", data=b"Hello, World!", unlink_on_exit=True)
154+
>>> sb.size
155+
18
156+
>>> sb.get(name="test")
157+
b'Hello, World!'
158+
>>> sb.re_create(b"New Data")
159+
>>> sb.get(name="test")
160+
b'New Data'
161+
>>> sb.close()
162+
>>> sb.get(name="test", default=b"") # This will raise ValueError since the shared memory is closed
163+
b''
164+
"""
165+
166+
closed: bool = True
167+
# max_size: 2 ** (i * 8). 1: 256 B, 2: 64 KB, 3: 16 MB, 4: 4 GB, 5: 1 TB, 6: 256 TB, 7: 64 PB, 8: 16 EB, defaults to 5(1TB).
168+
head_length: int = 5
169+
byteorder: typing.Literal["little", "big"] = "little"
170+
171+
def __init__(self, name: str, data: bytes, head_length=None, unlink_on_exit=False):
172+
self.name = name
173+
self.head_length = head_length or self.head_length
174+
if unlink_on_exit:
175+
atexit.register(self.close)
176+
self.create(data)
177+
178+
@property
179+
def size(self) -> int:
180+
if self.closed:
181+
raise ValueError("Shared memory is closed")
182+
return self.shm.size
183+
184+
def create(self, data: bytes):
185+
if not self.closed:
186+
raise ValueError("Shared memory is already created, please use re_create")
187+
size = len(data)
188+
head_length = self.head_length
189+
if size > 2 ** (head_length * 8):
190+
right_head_length = 0
191+
for i in range(head_length, 15):
192+
if size < 2 ** (i * 8):
193+
right_head_length = i
194+
break
195+
raise ValueError(
196+
f"data size {size} is too large, max size is {2 ** (head_length * 8)}, raise head_length at least {right_head_length}"
197+
)
198+
total_size = head_length + len(data)
199+
self.shm = SharedMemory(name=self.name, create=True, size=total_size)
200+
head = size.to_bytes(head_length, byteorder=self.byteorder)
201+
self.buf[:total_size] = head + data
202+
self.closed = False
203+
204+
@property
205+
def buf(self):
206+
if self.shm is None:
207+
raise RuntimeError("Shared memory is not initialized")
208+
return self.shm.buf
209+
210+
def re_create(self, data: bytes):
211+
if self.closed:
212+
raise ValueError("Shared memory is closed")
213+
self.close()
214+
self.create(data)
215+
216+
def close(self):
217+
if not self.closed:
218+
self.shm.close()
219+
self.shm.unlink()
220+
self.closed = True
221+
222+
def __enter__(self):
223+
return self
224+
225+
def __exit__(self, exc_type, exc_value, traceback):
226+
self.close()
227+
228+
@classmethod
229+
def get(cls, name: str, head_length=None, default=...) -> bytes:
230+
try:
231+
head_length = head_length or cls.head_length
232+
with closing(SharedMemory(name=name)) as shm:
233+
head = bytes(shm.buf[:head_length])
234+
body_length = int.from_bytes(head, byteorder=cls.byteorder)
235+
data = bytes(shm.buf[head_length : head_length + body_length])
236+
return data
237+
except FileNotFoundError:
238+
if default is ...:
239+
raise KeyError(f"Shared memory {name} not found")
240+
return default
241+
242+
140243
if __name__ == "__main__":
141244
import doctest
142245

0 commit comments

Comments
 (0)