From ed569c28afccd583b19fffae3d8a0508100eda44 Mon Sep 17 00:00:00 2001 From: Radu Marias Date: Tue, 4 Jun 2024 23:54:20 +0300 Subject: [PATCH] have Rust methods accept PyAny and convert inside --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 152 +++++++++++++---------------------- examples/lock_and_zeroize.py | 8 +- src/lib.rs | 80 ++++++++---------- tests/test_zeroize.py | 34 +++----- 6 files changed, 105 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66bad6d..96794a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,7 +427,7 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "zeroize" -version = "0.1.11" +version = "0.2.0" dependencies = [ "libc", "libsodium-sys", diff --git a/Cargo.toml b/Cargo.toml index 7165eb7..d53846a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zeroize" -version = "0.1.11" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/README.md b/README.md index e22eebc..ac5b734 100644 --- a/README.md +++ b/README.md @@ -12,133 +12,89 @@ It can work with `bytearray` and `numpy array`. > [!WARNING] > **In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process, see example below. -> Also by itself it doesn't work if memory is moved or moved to swap. You can use `zeroize.mlock()` and `zeroize.mlock_np()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.** +> Also by itself it doesn't work if memory is moved or moved to swap. You can use `zeroize.mlock()` and `zeroize.mlock()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.** # Examples ## Lock and zeroize memory ```python -from zeroize import zeroize1, zeroize_np -import numpy as np -import ctypes - - -# Load the C standard library -LIBC = ctypes.CDLL("libc.so.6") -MLOCK = LIBC.mlock -MUNLOCK = LIBC.munlock - -# Define mlock and munlock argument types -MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] -MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - - -def lock_memory(buffer): - """Locks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if MLOCK(address, size) != 0: - raise RuntimeError("Failed to lock memory") +"""By itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory""" +from zeroize import zeroize1, mlock, munlock +import numpy as np -def unlock_memory(buffer): - """Unlocks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if MUNLOCK(address, size) != 0: - raise RuntimeError("Failed to unlock memory") +if __name__ == "__main__": + try: + print("allocate memory") -try: - print("allocate memory") + # regular array + # max size you can lock is 4MB, at least on Linux + arr = bytearray(b"1234567890") - # regular array - arr = bytearray(b"1234567890") + # numpy array + # max size you can lock is 4MB, at least on Linux + arr_np = np.array([0] * 10, dtype=np.uint8) + arr_np[:] = arr + assert arr_np.tobytes() == b"1234567890" - # numpy array - arr_np = np.array([0] * 10, dtype=np.uint8) - arr_np[:] = arr - assert arr_np.tobytes() == b"1234567890" + print("locking memory") - print("locking memory") + mlock(arr) + mlock(arr_np) - lock_memory(arr) - lock_memory(arr_np) + print("zeroize'ing...: ") + zeroize1(arr) + zeroize1(arr_np) - print("zeroize'ing...: ") - zeroize1(arr) - zeroize_np(arr_np) + print("checking if is zeroized") + assert arr == bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00") + assert all(arr_np == 0) - print("checking if is zeroized") - assert arr == bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00") - assert all(arr_np == 0) + print("all good, bye!") - print("all good, bye!") -finally: - # Unlock the memory - print("unlocking memory") - unlock_memory(arr) - unlock_memory(arr_np) + finally: + # Unlock the memory + print("unlocking memory") + munlock(arr) + munlock(arr_np) ``` -## Zeroing memory before starting child process +## Zeroing memory before forking child process This mitigates the problems that appears on [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write). You need to zeroize the data before forking the child process. ```python -import os -from zeroize import zeroize1, zeroize_np -import numpy as np -import ctypes - - -# Load the C standard library -LIBC = ctypes.CDLL("libc.so.6") -MLOCK = LIBC.mlock -MUNLOCK = LIBC.munlock - -# Define mlock and munlock argument types -MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] -MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t] - - -def lock_memory(buffer): - """Locks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if MLOCK(address, size) != 0: - raise RuntimeError("Failed to lock memory") +""" In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process. """ +import os +from zeroize import zeroize1, mlock, munlock -def unlock_memory(buffer): - """Unlocks the memory of the given buffer.""" - address = ctypes.addressof(ctypes.c_char.from_buffer(buffer)) - size = len(buffer) - if MUNLOCK(address, size) != 0: - raise RuntimeError("Failed to unlock memory") +if __name__ == "__main__": + try: + # max size you can lock is 4MB, at least on Linux + sensitive_data = bytearray(b"Sensitive Information") + mlock(sensitive_data) -try: - sensitive_data = bytearray(b"Sensitive Information") - lock_memory(sensitive_data) + print("Before zeroization:", sensitive_data) - print("Before zeroization:", sensitive_data) + zeroize1(sensitive_data) + print("After zeroization:", sensitive_data) - zeroize1(sensitive_data) - print("After zeroization:", sensitive_data) + # Forking after zeroization to ensure no sensitive data is copied + pid = os.fork() + if pid == 0: + # This is the child process + print("Child process memory after fork:", sensitive_data) + else: + # This is the parent process + os.wait() # Wait for the child process to exit - # Forking after zeroization to ensure no sensitive data is copied - pid = os.fork() - if pid == 0: - # This is the child process - print("Child process memory after fork:", sensitive_data) - else: - # This is the parent process - os.wait() # Wait for the child process to exit -finally: - # Unlock the memory - print("unlocking memory") - unlock_memory(sensitive_data) + finally: + # Unlock the memory + print("unlocking memory") + munlock(sensitive_data) ``` # Building from source diff --git a/examples/lock_and_zeroize.py b/examples/lock_and_zeroize.py index 27e5e2c..5e1f506 100644 --- a/examples/lock_and_zeroize.py +++ b/examples/lock_and_zeroize.py @@ -1,6 +1,6 @@ """By itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory""" -from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np +from zeroize import zeroize1, mlock, munlock import numpy as np @@ -21,11 +21,11 @@ print("locking memory") mlock(arr) - mlock_np(arr_np) + mlock(arr_np) print("zeroize'ing...: ") zeroize1(arr) - zeroize_np(arr_np) + zeroize1(arr_np) print("checking if is zeroized") assert arr == bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00") @@ -37,4 +37,4 @@ # Unlock the memory print("unlocking memory") munlock(arr) - munlock_np(arr_np) + munlock(arr_np) diff --git a/src/lib.rs b/src/lib.rs index 7c474c9..bde2ec9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,78 +24,66 @@ static mut INITIALIZED: bool = false; #[pymodule] fn zeroize<'py>(_py: Python, m: &Bound<'py, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(zeroize1, m)?)?; - m.add_function(wrap_pyfunction!(zeroize_np, m)?)?; // m.add_function(wrap_pyfunction!(zeroize_mv, m)?)?; m.add_function(wrap_pyfunction!(mlock, m)?)?; m.add_function(wrap_pyfunction!(munlock, m)?)?; - m.add_function(wrap_pyfunction!(mlock_np, m)?)?; - m.add_function(wrap_pyfunction!(munlock_np, m)?)?; Ok(()) } #[pyfunction] -fn zeroize1<'py>(arr: &Bound<'py, PyByteArray>) -> PyResult<()> { - unsafe { arr.as_bytes_mut().zeroize(); } +fn zeroize1<'py>(arr: &Bound<'py, PyAny>) -> PyResult<()> { + as_array(arr)?.zeroize(); Ok(()) } #[pyfunction] -fn zeroize_np<'py>(arr: &Bound<'py, PyArray1>) -> PyResult<()> { - unsafe { arr.as_slice_mut().unwrap().zeroize(); } - Ok(()) -} - -#[pyfunction] -fn mlock<'py>(arr: &Bound<'py, PyByteArray>) -> PyResult<()> { - unsafe { - if !init() { - panic!("libsodium failed to initialize") - } - if !_mlock(arr.as_bytes_mut().as_mut_ptr()) { - panic!("mlock failed") - } +fn mlock<'py>(arr: &Bound<'py, PyAny>) -> PyResult<()> { + if !init() { + return Err(PyErr::new::( + "libsodium failed to initialize", + )); } - Ok(()) -} - -#[pyfunction] -fn mlock_np<'py>(arr: &Bound<'py, PyArray1>) -> PyResult<()> { unsafe { - if !init() { - panic!("libsodium failed to initialize") - } - if !_mlock(arr.as_slice_mut().unwrap().as_mut_ptr()) { - panic!("mlock failed") + if !_mlock(as_array(arr)?.as_mut_ptr()) { + return Err(PyErr::new::( + "mlock failed", + )); } } Ok(()) } #[pyfunction] -fn munlock<'py>(arr: &Bound<'py, PyByteArray>) -> PyResult<()> { +fn munlock<'py>(arr: &Bound<'py, PyAny>) -> PyResult<()> { + if !init() { + return Err(PyErr::new::( + "libsodium failed to initialize", + )); + } unsafe { - if !init() { - panic!("libsodium failed to initialize") - } - if !_munlock(arr.as_bytes_mut().as_mut_ptr()) { - panic!("mlock failed") + if !_munlock(as_array(arr)?.as_mut_ptr()) { + return Err(PyErr::new::( + "mlock failed", + )); } } Ok(()) } -#[pyfunction] -fn munlock_np<'py>(arr: &Bound<'py, PyArray1>) -> PyResult<()> { - unsafe { - if !init() { - panic!("libsodium failed to initialize") - } - if !_munlock(arr.as_slice_mut().unwrap().as_mut_ptr()) { - panic!("mlock failed") +fn as_array<'a>(arr: &'a Bound) -> PyResult<&'a mut [u8]> { + let arr = unsafe { + if let Ok(arr) = arr.downcast::() { + arr.as_bytes_mut() + } else if let Ok(arr) = arr.downcast::>() { + arr.as_slice_mut().unwrap() + } else { + return Err(PyErr::new::( + "Expected a PyByteArray or PyArray1", + )); } - } - Ok(()) + }; + Ok(arr) } // #[pyfunction] @@ -121,7 +109,7 @@ fn munlock_np<'py>(arr: &Bound<'py, PyArray1>) -> PyResult<()> { /// not* be used. /// /// Calling it multiple times is a no-op. -pub(crate) fn init() -> bool { +fn init() -> bool { unsafe { INIT.call_once(|| { // NOTE: Calls to transmute fail to compile if the source diff --git a/tests/test_zeroize.py b/tests/test_zeroize.py index 09954e4..4a63003 100644 --- a/tests/test_zeroize.py +++ b/tests/test_zeroize.py @@ -1,5 +1,5 @@ import unittest -from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np +from zeroize import zeroize1, mlock, munlock import numpy as np @@ -25,48 +25,36 @@ class TestStringMethods(unittest.TestCase): def test_zeroize1(self): try: arr = bytearray(b"1234567890") + arr_np = np.array([0] * 10, dtype=np.uint8) mlock(arr) + mlock(arr_np) zeroize1(arr) + zeroize1(arr_np) self.assertEqual( arr, bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00") ) + self.assertEqual(True, all(arr_np == 0)) finally: munlock(arr) - def test_zeroize_np(self): - try: - arr = np.array([0] * 10, dtype=np.uint8) - mlock_np(arr) - arr[:] = bytes(b"1234567890") - zeroize_np(arr) - self.assertEqual(True, all(arr == 0)) - - finally: - munlock_np(arr) - def test_zeroize1_sizes(self): for size in SIZES_MB: try: arr = bytearray(int(size * 1024 * 1024)) + arr_np = np.random.randint( + 0, 256, int(size * 1024 * 1024), dtype=np.uint8 + ) mlock(arr) + mlock(arr_np) zeroize1(arr) + zeroize1(arr_np) self.assertEqual(arr, bytearray(int(size * 1024 * 1024))) + self.assertEqual(True, all(arr_np == 0)) finally: munlock(arr) - def test_zeroize_np_sizes(self): - for size in SIZES_MB: - try: - array_size = int(size * 1024 * 1024) - random_array = np.random.randint(0, 256, array_size, dtype=np.uint8) - mlock_np(random_array) - zeroize_np(random_array) - self.assertEqual(True, all(random_array == 0)) - finally: - munlock_np(random_array) - if __name__ == "__main__": unittest.main()