Skip to content

Commit

Permalink
tests and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
radumarias committed Jun 4, 2024
1 parent 2c53947 commit 7aac3ea
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 246 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zeroize"
version = "0.1.9"
version = "0.1.10"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ It can work with `bytearray` and `numpy array`.

> [!WARNING]
> **In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process, see example below.
> Also by itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.**
> Also by itself it doesn't work if memory is moved or moved to swap. You can use `zeroize.mlock()` and `zeroize.mlock_np()` to lock the memory, max size you can lock is 4MB, at least on Linux, see example below.**
# Examples

Expand Down
75 changes: 5 additions & 70 deletions examples/lock_and_zeroize.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,7 @@
"""By itself it doesn't work if memory is moved or moved to swap. You can use `crypes` with `libc.mlock()` to lock the memory"""

from zeroize import zeroize1, zeroize_np
from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np
import numpy as np
import ctypes
import platform


os_name = platform.system()

if os_name == "Linux":
# Load the C standard library
LIBC = ctypes.CDLL("libc.so.6")
elif os_name == "Darwin":
# Load the C standard library
LIBC = ctypes.CDLL("libc.dylib")
elif os_name == "Windows":
# Load the kernel32 library
kernel32 = ctypes.windll.kernel32
else:
raise RuntimeError(f"Unsupported OS: {os_name}")

if os_name == "Linux" or os_name == "Darwin":
# Define mlock and munlock argument types
MLOCK = LIBC.mlock
MUNLOCK = LIBC.munlock

# Define mlock and munlock argument types
MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
elif os_name == "Windows":
# Define the VirtualLock and VirtualUnlock functions
VirtualLock = kernel32.VirtualLock
VirtualLock.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
VirtualLock.restype = ctypes.c_int

VirtualUnlock = kernel32.VirtualUnlock
VirtualUnlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
VirtualUnlock.restype = ctypes.c_int
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


def lock_memory(buffer):
"""Locks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buffer))
size = len(buffer)
if os_name == "Linux" or os_name == "Darwin":
if MLOCK(address, size) != 0:
raise RuntimeError("Failed to lock memory")
elif os_name == "Windows":
if VirtualLock(address, size) == 0:
raise RuntimeError("Failed to lock memory")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


def unlock_memory(buffer):
"""Unlocks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buffer))
size = len(buffer)
if os_name == "Linux" or os_name == "Darwin":
if MUNLOCK(address, size) != 0:
raise RuntimeError("Failed to unlock memory")
elif os_name == "Windows":
if VirtualUnlock(address, size) == 0:
raise RuntimeError("Failed to unlock memory")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


if __name__ == "__main__":
Expand All @@ -85,8 +20,8 @@ def unlock_memory(buffer):

print("locking memory")

lock_memory(arr)
lock_memory(arr_np)
mlock(arr)
mlock_np(arr_np)

print("zeroize'ing...: ")
zeroize1(arr)
Expand All @@ -101,5 +36,5 @@ def unlock_memory(buffer):
finally:
# Unlock the memory
print("unlocking memory")
unlock_memory(arr)
unlock_memory(arr_np)
munlock(arr)
munlock_np(arr_np)
71 changes: 3 additions & 68 deletions examples/zeroize_before_fork.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,14 @@
""" In the case of [Copy-on-write fork](https://en.wikipedia.org/wiki/Copy-on-write) you need to zeroize the memory before forking the child process. """

import os
from zeroize import zeroize1
import ctypes
import platform


os_name = platform.system()

if os_name == "Linux":
# Load the C standard library
LIBC = ctypes.CDLL("libc.so.6")
elif os_name == "Darwin":
# Load the C standard library
LIBC = ctypes.CDLL("libc.dylib")
elif os_name == "Windows":
# Load the kernel32 library
kernel32 = ctypes.windll.kernel32
else:
raise RuntimeError(f"Unsupported OS: {os_name}")

if os_name == "Linux" or os_name == "Darwin":
# Define mlock and munlock argument types
MLOCK = LIBC.mlock
MUNLOCK = LIBC.munlock

# Define mlock and munlock argument types
MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
elif os_name == "Windows":
# Define the VirtualLock and VirtualUnlock functions
VirtualLock = kernel32.VirtualLock
VirtualLock.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
VirtualLock.restype = ctypes.c_int

VirtualUnlock = kernel32.VirtualUnlock
VirtualUnlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
VirtualUnlock.restype = ctypes.c_int
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


def lock_memory(buffer):
"""Locks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buffer))
size = len(buffer)
if os_name == "Linux" or os_name == "Darwin":
if MLOCK(address, size) != 0:
raise RuntimeError("Failed to lock memory")
elif os_name == "Windows":
if VirtualLock(address, size) == 0:
raise RuntimeError("Failed to lock memory")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


def unlock_memory(buffer):
"""Unlocks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buffer))
size = len(buffer)
if os_name == "Linux" or os_name == "Darwin":
if MUNLOCK(address, size) != 0:
raise RuntimeError("Failed to unlock memory")
elif os_name == "Windows":
if VirtualUnlock(address, size) == 0:
raise RuntimeError("Failed to unlock memory")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")
from zeroize import zeroize1, mlock, munlock


if __name__ == "__main__":
try:
# max size you can lock is 4MB, at least on Linux
sensitive_data = bytearray(b"Sensitive Information")
lock_memory(sensitive_data)
mlock(sensitive_data)

print("Before zeroization:", sensitive_data)

Expand All @@ -92,4 +27,4 @@ def unlock_memory(buffer):
finally:
# Unlock the memory
print("unlocking memory")
unlock_memory(sensitive_data)
munlock(sensitive_data)
110 changes: 5 additions & 105 deletions tests/test_zeroize.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,6 @@
import unittest
from zeroize import zeroize1, zeroize_np, mlock, munlock, mlock_np, munlock_np
import numpy as np
import ctypes
import platform


os_name = platform.system()

if os_name == "Linux":
# Load the C standard library
LIBC = ctypes.CDLL("libc.so.6")
elif os_name == "Darwin":
# Load the C standard library
LIBC = ctypes.CDLL("libc.dylib")
elif os_name == "Windows":
# Define constants
PAGE_READWRITE = 0x04
MEM_COMMIT = 0x1000
MEM_RESERVE = 0x2000

# Load kernel32 library
kernel32 = ctypes.windll.kernel32

# Enable the SeLockMemoryPrivilege privilege
def enable_lock_memory_privilege():
TOKEN_ADJUST_PRIVILEGES = 0x0020
TOKEN_QUERY = 0x0008
SE_PRIVILEGE_ENABLED = 0x0002
privilege_name = "SeLockMemoryPrivilege"

class LUID(ctypes.Structure):
_fields_ = [("LowPart", ctypes.c_uint32), ("HighPart", ctypes.c_int32)]

class LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [("Luid", LUID), ("Attributes", ctypes.c_uint32)]

class TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [("PrivilegeCount", ctypes.c_uint32), ("Privileges", LUID_AND_ATTRIBUTES * 1)]

# Open process token
token_handle = ctypes.c_void_p()
if not kernel32.OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, ctypes.byref(token_handle)):
raise ctypes.WinError()

# Lookup privilege value
luid = LUID()
if not kernel32.LookupPrivilegeValueW(None, privilege_name, ctypes.byref(luid)):
raise ctypes.WinError()

# Adjust token privileges
tp = TOKEN_PRIVILEGES()
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED

if not kernel32.AdjustTokenPrivileges(token_handle, False, ctypes.byref(tp), ctypes.sizeof(tp), None, None):
raise ctypes.WinError()

# Close the token handle
kernel32.CloseHandle(token_handle)
else:
raise RuntimeError(f"Unsupported OS: {os_name}")

if os_name == "Linux" or os_name == "Darwin":
# Define mlock and munlock argument types
MLOCK = LIBC.mlock
MUNLOCK = LIBC.munlock

# Define mlock and munlock argument types
MLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
MUNLOCK.argtypes = [ctypes.c_void_p, ctypes.c_size_t]


def lock_memory(buf):
"""Locks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buf))
size = len(buf)
if os_name == "Linux" or os_name == "Darwin":
if MLOCK(address, size) != 0:
raise RuntimeError("Failed to lock memory")
elif os_name == "Windows":
buf_ptr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(buf)))
size = ctypes.sizeof(ctypes.c_char) * len(buf)
if not kernel32.VirtualLock(buf_ptr, size):
raise RuntimeError("VirtualLock failed")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


def unlock_memory(buf):
"""Unlocks the memory of the given buffer."""
address = ctypes.addressof(ctypes.c_char.from_buffer(buf))
size = len(buf)
if os_name == "Linux" or os_name == "Darwin":
if MUNLOCK(address, size) != 0:
raise RuntimeError("Failed to unlock memory")
elif os_name == "Windows":
buf_ptr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(buf)))
size = ctypes.sizeof(ctypes.c_char) * len(buf)
if not kernel32.VirtualUnlock(buf_ptr, size):
raise RuntimeError("VirtualUnlock failed")
else:
raise RuntimeError(f"Unsupported OS: {os_name}")


SIZES_MB = [
Expand All @@ -113,6 +12,11 @@ def unlock_memory(buf):
1,
2,
4,
8,
16,
32,
64,
128,
]


Expand Down Expand Up @@ -165,8 +69,4 @@ def test_zeroize_np_sizes(self):


if __name__ == "__main__":
# if os_name == "Windows":
# # Enable the privilege to lock memory
# enable_lock_memory_privilege()

unittest.main()

0 comments on commit 7aac3ea

Please sign in to comment.