Skip to content
Merged
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "ugrd"
version = "2.0.2"
version = "2.1.0"
authors = [
{ name="Desultory", email="[email protected]" },
]
Expand All @@ -19,7 +19,7 @@ classifiers = [

dependencies = [
"zenlib >= 3.0.2",
"pycpio >= 1.5.2"
"pycpio >= 1.5.5"
]

[project.optional-dependencies]
Expand Down
26 changes: 23 additions & 3 deletions src/ugrd/crypto/cryptsetup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__author__ = "desultory"
__version__ = "4.1.3"
__version__ = "4.2.0"

from json import loads
from pathlib import Path
Expand Down Expand Up @@ -28,6 +28,7 @@
"include_header",
"validate_key",
"validate_header",
"_dm-integrity", # Internal parameter for when dm-integrity was detected. mostly used for test automation
]


Expand Down Expand Up @@ -147,6 +148,9 @@ def _get_dm_info(self, mapped_name: str) -> dict:
def _get_dm_slave_info(self, device_info: dict) -> (str, dict):
"""Gets the device mapper slave information for a particular device."""
slave_source = device_info["slaves"][0]
# For integrity backed devices, get the slave's slave
if self["_vblk_info"].get(slave_source, {}).get("uuid", "").startswith("CRYPT-SUBDEV"):
slave_source = self["_vblk_info"][slave_source]["slaves"][0]
slave_name = self["_vblk_info"].get(slave_source, {}).get("name")
search_paths = ["/dev/", "/dev/mapper/"]

Expand Down Expand Up @@ -207,7 +211,7 @@ def _detect_luks_aes_module(self, luks_cipher_name: str) -> None:
self["_kmod_auto"] = crypto_config["module"]


def _detect_luks_header_aes(self, luks_info: dict) -> dict:
def _detect_luks_header_aes(self, luks_info: dict) -> None:
"""Checks the cipher type in the LUKS header, reads /proc/crypto to find the
corresponding driver. If it's not builtin, adds the module to the kernel modules."""
for keyslot in luks_info.get("keyslots", {}).values():
Expand All @@ -218,7 +222,7 @@ def _detect_luks_header_aes(self, luks_info: dict) -> dict:
_detect_luks_aes_module(self, segment["encryption"])


def _detect_luks_header_sha(self, luks_info: dict) -> dict:
def _detect_luks_header_sha(self, luks_info: dict) -> None:
"""Reads the hash algorithm from the LUKS header,
enables the corresponding kernel module using _crypto_ciphers"""
for keyslot in luks_info.get("keyslots", {}).values():
Expand All @@ -229,6 +233,21 @@ def _detect_luks_header_sha(self, luks_info: dict) -> dict:
self["kernel_modules"] = self._crypto_ciphers[digest["hash"]]["driver"]


def _detect_luks_header_integrity(self, luks_info: dict, mapped_name: str) -> None:
"""Reads the integrity algorithm from the LUKS header,
Enables the dm-integrity module, and returns the integrity type."""
for segment in luks_info.get("segments", {}).values():
if integrity_type := segment.get("integrity", {}).get("type"):
integrity_kmods = ["dm_integrity", "authenc"]
if integrity_type.startswith("hmac"):
integrity_kmods.append("hmac")
self.logger.info(
f"[{c_(mapped_name, 'blue')}]({c_(integrity_type, 'cyan')}) Enabling kernel modules for dm-integrity: {c_(', '.join(integrity_kmods), 'magenta', bright=True)}"
)
self["cryptsetup"][mapped_name]["_dm-integrity"] = integrity_type
return


@contains("cryptsetup_header_validation", "Skipping cryptsetup header validation.", log_level=30)
def _validate_cryptsetup_header(self, mapped_name: str) -> None:
"""Validates configured cryptsetup volumes against the LUKS header."""
Expand Down Expand Up @@ -265,6 +284,7 @@ def _validate_cryptsetup_header(self, mapped_name: str) -> None:

_detect_luks_header_aes(self, luks_info)
_detect_luks_header_sha(self, luks_info)
_detect_luks_header_integrity(self, luks_info, mapped_name)

if not self["argon2"]: # if argon support was not detected, check if the header wants it
for keyslot in luks_info.get("keyslots", {}).values():
Expand Down
44 changes: 39 additions & 5 deletions src/ugrd/fs/mounts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__author__ = "desultory"
__version__ = "7.1.4"
__version__ = "7.2.1"

from pathlib import Path
from re import search
Expand Down Expand Up @@ -504,20 +504,26 @@ def _autodetect_dm(self, mountpoint, device=None) -> None:
Ensures it's a device mapper mount, then autodetects the mount type.
Adds kmods to the autodetect list based on the mount source.
"""
# If a device is explicity passed, set it as the source device
if device:
self.logger.debug("[%s] Using provided device for mount autodetection: %s" % (mountpoint, device))
source_device = device
# If it's a mountpoint, try to resolve underlying devices
elif mountpoint:
source_device = _resolve_overlay_lower_device(self, mountpoint)
else:
raise AutodetectError("Mountpoint not found in host mounts: %s" % mountpoint)

# Get the name of the device from the path
device_name = source_device.split("/")[-1]
# Check that it's a device mapper mount (by prefix or path)
# If it's not, skip this autodetection by returning early
if not any(device_name.startswith(prefix) for prefix in ["dm-", "md"]):
if not source_device.startswith("/dev/mapper/"):
self.logger.debug("Mount is not a device mapper mount: %s" % source_device)
return

# Get the source device using blkid info/virtual block device info
if source_device not in self["_blkid_info"]:
if device_name in self["_vblk_info"]:
source_name = self["_vblk_info"][device_name]["name"]
Expand All @@ -535,6 +541,7 @@ def _autodetect_dm(self, mountpoint, device=None) -> None:
major, minor = _get_device_id(source_device)
self.logger.debug("[%s] Major: %s, Minor: %s" % (source_device, major, minor))

# Get the virtual block device name using the major/minor
for name, info in self["_vblk_info"].items():
if info["major"] == str(major) and info["minor"] == str(minor):
dev_name = name
Expand All @@ -544,27 +551,43 @@ def _autodetect_dm(self, mountpoint, device=None) -> None:
"[%s] Unable to find device mapper device with maj: %s min: %s" % (source_device, major, minor)
)

# Check that the virtual block device has slaves defined
if len(self["_vblk_info"][dev_name]["slaves"]) == 0:
raise AutodetectError("No slaves found for device mapper device, unknown type: %s" % source_device.name)
# Treat the first slave as the source for autodetection
slave_source = self["_vblk_info"][dev_name]["slaves"][0]
# If the slave source is a CRYPT-SUBDEV device, use its slave instead
if self["_vblk_info"].get(slave_source, {}).get("uuid", "").startswith("CRYPT-SUBDEV"):
slave_source = self["_vblk_info"][slave_source]["slaves"][0]
self.logger.info(f"[{c_(dev_name, 'blue')}] Slave is a CRYPT-SUBDEV, using its slave instead: {c_(slave_source, 'cyan')}")
# Add the kmod for it
self.logger.info(f"[{c_(dev_name, 'blue')}] Adding kmod for CRYPT-SUBDEV: {c_('dm-crypt', 'magenta')}")
self["_kmod_auto"] = ["dm_integrity", "authenc"]

autodetect_mount_kmods(self, slave_source)

# Check that the source device name matches the devie mapper name
if source_device.name != self["_vblk_info"][dev_name]["name"] and source_device.name != dev_name:
raise ValidationError(
"Device mapper device name mismatch: %s != %s" % (source_device.name, self["_vblk_info"][dev_name]["name"])
)

# Get block info using the slave source device
try:
blkid_info = self["_blkid_info"][f"/dev/{slave_source}"]
except KeyError:
if slave_source in self["_vblk_info"]:
# If the slave source isn't used in blkid, use the /dev/mapper path with the slave name
blkid_info = self["_blkid_info"][f"/dev/mapper/{self['_vblk_info'][slave_source]['name']}"]
else:
return self.logger.warning(f"No blkid info found for device mapper slave: {c_(slave_source, 'yellow')}")
if source_device.name != self["_vblk_info"][dev_name]["name"] and source_device.name != dev_name:
raise ValidationError(
"Device mapper device name mismatch: %s != %s" % (source_device.name, self["_vblk_info"][dev_name]["name"])
)

self.logger.debug(
"[%s] Device mapper info: %s\nDevice config: %s"
% (source_device.name, self["_vblk_info"][dev_name], blkid_info)
)

# With the blkid info, run the appropriate autodetect function based on the type
if blkid_info.get("type") == "crypto_LUKS" or source_device.name in self.get("cryptsetup", {}):
autodetect_luks(self, source_device, dev_name, blkid_info)
elif blkid_info.get("type") == "LVM2_member":
Expand All @@ -579,8 +602,19 @@ def _autodetect_dm(self, mountpoint, device=None) -> None:
raise AutodetectError("[%s] No type found for device mapper device: %s" % (dev_name, source_device))
raise ValidationError("Unknown device mapper device type: %s" % blkid_info.get("type"))

# Run autodetect on all slaves, in case of nested device mapper devices
for slave in self["_vblk_info"][dev_name]["slaves"]:
try:
# If the slave is a CRYPT-SUBDEV, iterate over its slaves instead
if self["_vblk_info"][slave]["uuid"].startswith("CRYPT-SUBDEV"):
for crypt_slave in self["_vblk_info"][slave]["slaves"]:
_autodetect_dm(self, mountpoint, crypt_slave)
self.logger.info(
"[%s] Autodetected device mapper container: %s"
% (c_(source_device.name, "blue", bright=True), c_(crypt_slave, "cyan"))
)
continue
# Otherwise, just autodetect the slave device
_autodetect_dm(self, mountpoint, slave) # Just pass the slave device name, as it will be re-detected
self.logger.info(
"[%s] Autodetected device mapper container: %s"
Expand Down
23 changes: 18 additions & 5 deletions src/ugrd/fs/test_image.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
__version__ = "2.0.0"
__version__ = "2.1.0"

from re import match
from tempfile import TemporaryDirectory

from zenlib.util import colorize as c_
from zenlib.util import contains


MIN_FS_SIZES = {"btrfs": 110, "f2fs": 50}


@contains("test_flag", "A test flag must be set to create a test image", raise_exception=True)
def init_banner(self):
"""Initialize the test image banner, set a random flag if not set."""
Expand All @@ -33,12 +34,13 @@ def _allocate_image(self, image_path, padding=0):

with open(image_path, "wb") as f:
total_size = (self.test_image_size + padding) * (2**20) # Convert MB to bytes
self.logger.info(f"Allocating {self.test_image_size + padding}MB test image file: { c_(f.name, 'green')}")
self.logger.debug(f"[{f.name}] Total bytes: { c_(total_size, 'green')}")
self.logger.info(f"Allocating {self.test_image_size + padding}MB test image file: {c_(f.name, 'green')}")
self.logger.debug(f"[{f.name}] Total bytes: {c_(total_size, 'green')}")
f.write(b"\0" * total_size)


def _copy_fs_contents(self, image_path, build_dir):
""" Mount and copy the filesystem contents into the image,
"""Mount and copy the filesystem contents into the image,
for filesystems which cannot be created directly from a directory"""
try:
with TemporaryDirectory() as tmp_dir:
Expand Down Expand Up @@ -84,6 +86,14 @@ def make_test_luks_image(self, image_path):
keyfile_path = _get_luks_keyfile(self)
self.logger.info("Using LUKS keyfile: %s" % c_(keyfile_path, "green"))
self.logger.info("Creating LUKS image: %s" % c_(image_path, "green"))
extra_args = []
if integrity_type := _get_luks_config(self).get("_dm-integrity"):
# If it's the type reported in the header like <type>(<algo>), turn it into <type>-<algo> for arg usage
if m := match(r"^(?P<type>\w+)\((?P<algo>[\w-]+)\)$", integrity_type):
integrity_type = f"{m['type']}-{m['algo']}"
self.logger.info(f"[{c_(image_path, 'green')}] LUKS integrity type: {c_(integrity_type, 'cyan')}")
extra_args.extend(["--integrity", integrity_type])

self._run(
[
"cryptsetup",
Expand All @@ -94,6 +104,9 @@ def make_test_luks_image(self, image_path):
"--batch-mode",
"--key-file",
keyfile_path,
"--pbkdf-memory",
"8192", # Only use 8MB of memory for PBKDF to speed up test image creation and avoid high memory usage
*extra_args,
]
)
self.logger.info("Opening LUKS image: %s" % c_(image_path, "magenta"))
Expand Down
10 changes: 10 additions & 0 deletions tests/test_cryptsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ def test_cryptsetup_included_key(self):
generator = InitramfsGenerator(logger=self.logger, config="tests/cryptsetup_included_key.toml")
generator.build()

def test_cryptsetup_integrity(self):
"""Tests LUKS based roots using a keyfile included in the initramfs with integrity protection"""
generator = InitramfsGenerator(
logger=self.logger,
config="tests/cryptsetup_included_key.toml",
_kmod_auto=["dm-integrity", "authenc"], # Specify this because its usually auto-detected during header validation
cryptsetup={"root": {"_dm-integrity": "hmac(sha256)"}}, # Use the type like defined in the header, not the proper args to test processing
)
generator.build()


if __name__ == "__main__":
main()