Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ugrd/base/test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
__version__ = "1.0.0"
__version__ = "1.1.0"

from zenlib.util import unset

COPY_CONFIG = ["mounts", "tmpdir", "clean", "test_image_size", "test_flag"]
COPY_CONFIG = ["mounts", "tmpdir", "clean", "test_image_size", "test_flag", "cryptsetup"]


@unset("test_kernel")
Expand Down Expand Up @@ -75,7 +75,7 @@ def make_test_image(self):
"modules": "ugrd.fs.test_image",
"out_file": self["test_rootfs_name"],
"build_dir": self["test_rootfs_build_dir"],
**{key: self[key] for key in COPY_CONFIG},
**{key: self.get(key) for key in COPY_CONFIG if self.get(key) is not None},
}

target_fs = InitramfsGenerator(**kwargs)
Expand Down
63 changes: 52 additions & 11 deletions src/ugrd/fs/test_image.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.8.0"
__version__ = "1.0.0"

from zenlib.util import contains

Expand All @@ -9,7 +9,7 @@ def init_banner(self):
self["banner"] = f"echo {self['test_flag']}"


def _allocate_image(self, image_path):
def _allocate_image(self, image_path, padding=0):
"""Allocate the test image size"""
if image_path.exists():
if self.clean:
Expand All @@ -20,33 +20,70 @@ def _allocate_image(self, image_path):

with open(image_path, "wb") as f:
self.logger.info("Allocating test image file: %s" % f.name)
f.write(b"\0" * self.test_image_size * 2**20)
f.write(b"\0" * (self.test_image_size + padding) * 2**20)


def _get_luks_config(self):
""" Gets the LUKS configuration from the passed cryptsetup config using _cryptsetup_root as the key,
if not found, uses the first defined luks device if there is only one, otherwise raises an exception """
if dev := self["cryptsetup"].get(self["_cryptsetup_root"]):
return dev
if len(self["cryptsetup"]) == 1:
return next(iter(self["cryptsetup"].values()))
raise ValueError("Could not find a LUKS configuration")

def _get_luks_uuid(self):
""" Gets the uuid from the cryptsetup root config """
return _get_luks_config(self).get("uuid")


def _get_luks_keyfile(self):
""" Gets the luks keyfile the cryptsetup root config,
if not defined, generates a keyfile using the banner """
config = _get_luks_config(self)
if keyfile := config.get("key_file"):
return keyfile
raise ValueError("No LUKS key_file is set.")

def make_test_luks_image(self, image_path):
""" Creates a LUKS image to hold the test image """
try:
self._run(["cryptsetup", "status", "test_image"]) # Check if the LUKS device is already open
self.logger.warning("LUKS device 'test_image' is already open, closing it")
self._run(["cryptsetup", "luksClose", "test_image"])
except RuntimeError:
pass
_allocate_image(self, image_path, padding=32) # First allocate the image file, adding padding for the LUKS header
keyfile_path = _get_luks_keyfile(self)
self.logger.info("Using LUKS keyfile: %s" % keyfile_path)
self.logger.info("Creating LUKS image: %s" % image_path)
self._run(["cryptsetup", "luksFormat", image_path, "--uuid", _get_luks_uuid(self), "--batch-mode", "--key-file", keyfile_path])
self.logger.info("Opening LUKS image: %s" % image_path)
self._run(["cryptsetup", "luksOpen", image_path, "test_image", "--key-file", keyfile_path])

def make_test_image(self):
"""Creates a test image from the build dir"""
build_dir = self._get_build_path("/").resolve()
self.logger.info("Creating test image from: %s" % build_dir)

rootfs_uuid = self["mounts"]["root"]["uuid"]
rootfs_type = self["mounts"]["root"]["type"]

image_path = self._get_out_path(self["out_file"])
if rootfs_type == "ext4":
# Create the test image file, flll with 0s

if self.get("cryptsetup"): # If there is cryptsetup config, create a LUKS image
make_test_luks_image(self, image_path)
image_path = "/dev/mapper/test_image"
else:
_allocate_image(self, image_path)

if rootfs_type == "ext4":
self._run(["mkfs", "-t", rootfs_type, "-d", build_dir, "-U", rootfs_uuid, "-F", image_path])
elif rootfs_type == "btrfs":
if self["clean"] and image_path.exists():
self.logger.warning("Removing existing test image file: %s" % image_path)
image_path.unlink()
self._run(["mkfs", "-t", rootfs_type, "-f", "--rootdir", build_dir, "-U", rootfs_uuid, image_path])
elif rootfs_type == "xfs":
_allocate_image(self, image_path)
self._run(["mkfs", "-t", rootfs_type, "-m", "uuid=%s" % rootfs_uuid, image_path])
try: # XFS doesn't support importing a directory as a filesystem, it must be mounted
from tempfile import TemporaryDirectory

with TemporaryDirectory() as tmp_dir:
self._run(["mount", image_path, tmp_dir])
self._run(["cp", "-a", f"{build_dir}/.", tmp_dir])
Expand All @@ -55,3 +92,7 @@ def make_test_image(self):
raise RuntimeError("Could not mount the XFS test image: %s", e)
else:
raise NotImplementedError("Unsupported test rootfs type: %s" % rootfs_type)

if self.get("cryptsetup"): # Leave it open in the event of failure, close it before executing tests
self.logger.info("Closing LUKS image: %s" % image_path)
self._run(["cryptsetup", "luksClose", "test_image"])
4 changes: 4 additions & 0 deletions src/ugrd/fs/test_image.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ paths = ['dev', 'sys', 'proc']

test_image_size = 16
shebang = "#!/bin/bash"
_cryptsetup_root = "root"


[imports.build_pre]
"ugrd.fs.test_image" = ["init_banner"]
Expand All @@ -13,6 +15,8 @@ shebang = "#!/bin/bash"
[custom_parameters]
shebang = "str" # Add the shebang property, because the test mode disables the base module
mounts = "dict" # We only need the mounts dict, not the whole module
cryptsetup = "dict" # Same as above
_cryptsetup_root = "str" # Define the root device for cryptsetup
test_image_size = "int" # Define the size of the test image in MiB
test_flag = "str" # Define the success flag used to determine if the test was successful

25 changes: 25 additions & 0 deletions tests/cryptsetup_included_key.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Test included keyfile

modules = [
"ugrd.crypto.cryptsetup",
"ugrd.base.test",
]

# The initramfs will be built in /tmp/initramfs if "build_dir" is not specified not specified
out_dir = "initramfs_test"
test_memory = '2G'

#kernel_version = "6.6.35-gentoo-dist"
#test_kernel = "/boot/vmlinuz-6.6.35-gentoo-dist"
cpio_compression = false
hostonly = false

[cryptsetup.root]
uuid = "abcd1234-abcd-1234-abcd-1234abcd1234"
key_file = "/tmp/ugrd_test_key" # Currently hardcoded for this test
include_key = true

[mounts.root]
uuid = "10101010-abcd-1234-abcd-101010101010"
type = "ext4"

35 changes: 35 additions & 0 deletions tests/test_cryptsetup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from unittest import TestCase, main

from ugrd.initramfs_generator import InitramfsGenerator
from zenlib.logging import loggify


@loggify
class TestCryptsetup(TestCase):
def setUp(self):
"""Create the test keyfile"""
from pathlib import Path
from uuid import uuid4

keyfile = Path("/tmp/ugrd_test_key")
if keyfile.exists():
raise FileExistsError("Test Keyfile already exists!: %s" % keyfile)

with open("/tmp/ugrd_test_key", "wb") as f:
f.write(uuid4().bytes)

def tearDown(self):
"""Remove the test keyfile"""
from pathlib import Path

keyfile = Path("/tmp/ugrd_test_key")
if keyfile.exists():
keyfile.unlink()

def test_cryptsetup_included_key(self):
generator = InitramfsGenerator(logger=self.logger, config="tests/cryptsetup_included_key.toml")
generator.build()


if __name__ == "__main__":
main()
22 changes: 10 additions & 12 deletions tests/test_output_paths.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
from unittest import TestCase, main
from pathlib import Path
from unittest import TestCase, main
from uuid import uuid4

from ugrd.initramfs_generator import InitramfsGenerator

from zenlib.logging import loggify


@loggify
class TestOutFile(TestCase):
def test_absolute_out_file(self):
out_file = Path(f'/tmp/{uuid4()}.cpio')
out_file = Path(f"/tmp/{uuid4()}.cpio")
if out_file.exists():
self.fail(f'File {out_file} already exists')
generator = InitramfsGenerator(logger=self.logger, config='tests/fullauto.toml', out_file=out_file)
self.fail(f"File {out_file} already exists")
generator = InitramfsGenerator(logger=self.logger, config="tests/fullauto.toml", out_file=out_file)
generator.build()
self.assertTrue(out_file.exists())
out_file.unlink()

def test_named_out_file(self):
out_file = Path(f"{uuid4()}.cpio")
generator = InitramfsGenerator(logger=self.logger, config='tests/fullauto.toml', out_file=out_file)
generator = InitramfsGenerator(logger=self.logger, config="tests/fullauto.toml", out_file=out_file)
out_path = generator._get_out_path(out_file)
if out_path.exists():
self.fail(f'File {out_path} already exists')
self.fail(f"File {out_path} already exists")
generator.build()
self.assertTrue(out_path.exists())


def test_relative_out_file(self):
out_file = f'./{uuid4()}.cpio'
generator = InitramfsGenerator(logger=self.logger, config='tests/fullauto.toml', out_file=out_file)
out_file = f"./{uuid4()}.cpio"
generator = InitramfsGenerator(logger=self.logger, config="tests/fullauto.toml", out_file=out_file)
out_path = Path(out_file)
if out_path.exists():
self.fail(f'File {out_file} already exists')
self.fail(f"File {out_file} already exists")
generator.build()
self.assertTrue(out_path.exists())
out_path.unlink()


if __name__ == '__main__':
if __name__ == "__main__":
main()
13 changes: 6 additions & 7 deletions tests/test_ugrd.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
from unittest import TestCase, main, expectedFailure
from unittest import TestCase, expectedFailure, main

from ugrd.initramfs_generator import InitramfsGenerator

from zenlib.logging import loggify


@loggify
class TestCpio(TestCase):
class TestUGRD(TestCase):
def test_fullauto(self):
generator = InitramfsGenerator(logger=self.logger, config='tests/fullauto.toml')
generator = InitramfsGenerator(logger=self.logger, config="tests/fullauto.toml")
generator.build()

def test_xz(self):
generator = InitramfsGenerator(logger=self.logger, config='tests/fullauto.toml', cpio_compression='xz')
generator = InitramfsGenerator(logger=self.logger, config="tests/fullauto.toml", cpio_compression="xz")
generator.build()

@expectedFailure
def test_bad_config(self):
generator = InitramfsGenerator(logger=self.logger, config='tests/bad_config.toml')
generator = InitramfsGenerator(logger=self.logger, config="tests/bad_config.toml")
generator.build()


if __name__ == '__main__':
if __name__ == "__main__":
main()