From 7aac3ea14a4a5b364e31767c6d7ebb26778a564e Mon Sep 17 00:00:00 2001 From: Radu Marias Date: Tue, 4 Jun 2024 22:52:12 +0300 Subject: [PATCH] tests and examples --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 2 +- examples/lock_and_zeroize.py | 75 ++-------------------- examples/zeroize_before_fork.py | 71 +-------------------- tests/test_zeroize.py | 110 ++------------------------------ 6 files changed, 16 insertions(+), 246 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b794c48..7996e15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,7 +427,7 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "zeroize" -version = "0.1.9" +version = "0.1.10" dependencies = [ "libc", "libsodium-sys", diff --git a/Cargo.toml b/Cargo.toml index cfb9a96..8e61173 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zeroize" -version = "0.1.9" +version = "0.1.10" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/README.md b/README.md index b6ef9a3..e22eebc 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ It can work with `bytearray` and `numpy array`. > [!WARNING] > **In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process, see example below. -> Also by itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.** +> Also by itself it doesn't work if memory is moved or moved to swap. You can use `zeroize.mlock()` and `zeroize.mlock_np()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.** # Examples diff --git a/examples/lock_and_zeroize.py b/examples/lock_and_zeroize.py index dfed34e..27e5e2c 100644 --- a/examples/lock_and_zeroize.py +++ b/examples/lock_and_zeroize.py @@ -1,72 +1,7 @@ """By itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory""" -from zeroize import zeroize1, zeroize_np +from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np import numpy as np -import ctypes -import platform - - -os_name = platform.system() - -if os_name == "Linux": - # Load the C standard library - LIBC = ctypes.CDLL("libc.so.6") -elif os_name == "Darwin": - # Load the C standard library - LIBC = ctypes.CDLL("libc.dylib") -elif os_name == "Windows": - # Load the kernel32 library - kernel32 = ctypes.windll.kernel32 -else: - raise RuntimeError(f"Unsupported OS: {os_name}") - -if os_name == "Linux" or os_name == "Darwin": - # Define mlock and munlock argument types - MLOCK = LIBC.mlock - MUNLOCK = LIBC.munlock - - # Define mlock and munlock argument types - MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] -elif os_name == "Windows": - # Define the VirtualLock and VirtualUnlock functions - VirtualLock = kernel32.VirtualLock - VirtualLock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - VirtualLock.restype = ctypes.c_int - - VirtualUnlock = kernel32.VirtualUnlock - VirtualUnlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - VirtualUnlock.restype = ctypes.c_int -else: - raise RuntimeError(f"Unsupported OS: {os_name}") - - -def lock_memory(buffer): - """Locks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if os_name == "Linux" or os_name == "Darwin": - if MLOCK(address, size) != 0: - raise RuntimeError("Failed to lock memory") - elif os_name == "Windows": - if VirtualLock(address, size) == 0: - raise RuntimeError("Failed to lock memory") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") - - -def unlock_memory(buffer): - """Unlocks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if os_name == "Linux" or os_name == "Darwin": - if MUNLOCK(address, size) != 0: - raise RuntimeError("Failed to unlock memory") - elif os_name == "Windows": - if VirtualUnlock(address, size) == 0: - raise RuntimeError("Failed to unlock memory") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") if __name__ == "__main__": @@ -85,8 +20,8 @@ def unlock_memory(buffer): print("locking memory") - lock_memory(arr) - lock_memory(arr_np) + mlock(arr) + mlock_np(arr_np) print("zeroize'ing...: ") zeroize1(arr) @@ -101,5 +36,5 @@ def unlock_memory(buffer): finally: # Unlock the memory print("unlocking memory") - unlock_memory(arr) - unlock_memory(arr_np) + munlock(arr) + munlock_np(arr_np) diff --git a/examples/zeroize_before_fork.py b/examples/zeroize_before_fork.py index 655c884..1902ee8 100644 --- a/examples/zeroize_before_fork.py +++ b/examples/zeroize_before_fork.py @@ -1,79 +1,14 @@ """ In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process. """ import os -from zeroize import zeroize1 -import ctypes -import platform - - -os_name = platform.system() - -if os_name == "Linux": - # Load the C standard library - LIBC = ctypes.CDLL("libc.so.6") -elif os_name == "Darwin": - # Load the C standard library - LIBC = ctypes.CDLL("libc.dylib") -elif os_name == "Windows": - # Load the kernel32 library - kernel32 = ctypes.windll.kernel32 -else: - raise RuntimeError(f"Unsupported OS: {os_name}") - -if os_name == "Linux" or os_name == "Darwin": - # Define mlock and munlock argument types - MLOCK = LIBC.mlock - MUNLOCK = LIBC.munlock - - # Define mlock and munlock argument types - MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] -elif os_name == "Windows": - # Define the VirtualLock and VirtualUnlock functions - VirtualLock = kernel32.VirtualLock - VirtualLock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - VirtualLock.restype = ctypes.c_int - - VirtualUnlock = kernel32.VirtualUnlock - VirtualUnlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - VirtualUnlock.restype = ctypes.c_int -else: - raise RuntimeError(f"Unsupported OS: {os_name}") - - -def lock_memory(buffer): - """Locks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if os_name == "Linux" or os_name == "Darwin": - if MLOCK(address, size) != 0: - raise RuntimeError("Failed to lock memory") - elif os_name == "Windows": - if VirtualLock(address, size) == 0: - raise RuntimeError("Failed to lock memory") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") - - -def unlock_memory(buffer): - """Unlocks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if os_name == "Linux" or os_name == "Darwin": - if MUNLOCK(address, size) != 0: - raise RuntimeError("Failed to unlock memory") - elif os_name == "Windows": - if VirtualUnlock(address, size) == 0: - raise RuntimeError("Failed to unlock memory") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") +from zeroize import zeroize1, mlock, munlock if __name__ == "__main__": try: # max size you can lock is 4MB, at least on Linux sensitive_data = bytearray(b"Sensitive Information") - lock_memory(sensitive_data) + mlock(sensitive_data) print("Before zeroization:", sensitive_data) @@ -92,4 +27,4 @@ def unlock_memory(buffer): finally: # Unlock the memory print("unlocking memory") - unlock_memory(sensitive_data) + munlock(sensitive_data) diff --git a/tests/test_zeroize.py b/tests/test_zeroize.py index 430b36d..09954e4 100644 --- a/tests/test_zeroize.py +++ b/tests/test_zeroize.py @@ -1,107 +1,6 @@ import unittest from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np import numpy as np -import ctypes -import platform - - -os_name = platform.system() - -if os_name == "Linux": - # Load the C standard library - LIBC = ctypes.CDLL("libc.so.6") -elif os_name == "Darwin": - # Load the C standard library - LIBC = ctypes.CDLL("libc.dylib") -elif os_name == "Windows": - # Define constants - PAGE_READWRITE = 0x04 - MEM_COMMIT = 0x1000 - MEM_RESERVE = 0x2000 - - # Load kernel32 library - kernel32 = ctypes.windll.kernel32 - - # Enable the SeLockMemoryPrivilege privilege - def enable_lock_memory_privilege(): - TOKEN_ADJUST_PRIVILEGES = 0x0020 - TOKEN_QUERY = 0x0008 - SE_PRIVILEGE_ENABLED = 0x0002 - privilege_name = "SeLockMemoryPrivilege" - - class LUID(ctypes.Structure): - _fields_ = [("LowPart", ctypes.c_uint32), ("HighPart", ctypes.c_int32)] - - class LUID_AND_ATTRIBUTES(ctypes.Structure): - _fields_ = [("Luid", LUID), ("Attributes", ctypes.c_uint32)] - - class TOKEN_PRIVILEGES(ctypes.Structure): - _fields_ = [("PrivilegeCount", ctypes.c_uint32), ("Privileges", LUID_AND_ATTRIBUTES * 1)] - - # Open process token - token_handle = ctypes.c_void_p() - if not kernel32.OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, ctypes.byref(token_handle)): - raise ctypes.WinError() - - # Lookup privilege value - luid = LUID() - if not kernel32.LookupPrivilegeValueW(None, privilege_name, ctypes.byref(luid)): - raise ctypes.WinError() - - # Adjust token privileges - tp = TOKEN_PRIVILEGES() - tp.PrivilegeCount = 1 - tp.Privileges[0].Luid = luid - tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED - - if not kernel32.AdjustTokenPrivileges(token_handle, False, ctypes.byref(tp), ctypes.sizeof(tp), None, None): - raise ctypes.WinError() - - # Close the token handle - kernel32.CloseHandle(token_handle) -else: - raise RuntimeError(f"Unsupported OS: {os_name}") - -if os_name == "Linux" or os_name == "Darwin": - # Define mlock and munlock argument types - MLOCK = LIBC.mlock - MUNLOCK = LIBC.munlock - - # Define mlock and munlock argument types - MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - - -def lock_memory(buf): - """Locks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buf)) - size = len(buf) - if os_name == "Linux" or os_name == "Darwin": - if MLOCK(address, size) != 0: - raise RuntimeError("Failed to lock memory") - elif os_name == "Windows": - buf_ptr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(buf))) - size = ctypes.sizeof(ctypes.c_char) * len(buf) - if not kernel32.VirtualLock(buf_ptr, size): - raise RuntimeError("VirtualLock failed") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") - - -def unlock_memory(buf): - """Unlocks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buf)) - size = len(buf) - if os_name == "Linux" or os_name == "Darwin": - if MUNLOCK(address, size) != 0: - raise RuntimeError("Failed to unlock memory") - elif os_name == "Windows": - buf_ptr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(buf))) - size = ctypes.sizeof(ctypes.c_char) * len(buf) - if not kernel32.VirtualUnlock(buf_ptr, size): - raise RuntimeError("VirtualUnlock failed") - else: - raise RuntimeError(f"Unsupported OS: {os_name}") SIZES_MB = [ @@ -113,6 +12,11 @@ def unlock_memory(buf): 1, 2, 4, + 8, + 16, + 32, + 64, + 128, ] @@ -165,8 +69,4 @@ def test_zeroize_np_sizes(self): if __name__ == "__main__": - # if os_name == "Windows": - # # Enable the privilege to lock memory - # enable_lock_memory_privilege() - unittest.main()