1- __version__ = "0.8 .0"
1+ __version__ = "1.0 .0"
22
33from zenlib .util import contains
44
@@ -9,7 +9,7 @@ def init_banner(self):
99 self ["banner" ] = f"echo { self ['test_flag' ]} "
1010
1111
12- def _allocate_image (self , image_path ):
12+ def _allocate_image (self , image_path , padding = 0 ):
1313 """Allocate the test image size"""
1414 if image_path .exists ():
1515 if self .clean :
@@ -20,33 +20,70 @@ def _allocate_image(self, image_path):
2020
2121 with open (image_path , "wb" ) as f :
2222 self .logger .info ("Allocating test image file: %s" % f .name )
23- f .write (b"\0 " * self .test_image_size * 2 ** 20 )
23+ f .write (b"\0 " * ( self .test_image_size + padding ) * 2 ** 20 )
2424
2525
26+ def _get_luks_config (self ):
27+ """ Gets the LUKS configuration from the passed cryptsetup config using _cryptsetup_root as the key,
28+ if not found, uses the first defined luks device if there is only one, otherwise raises an exception """
29+ if dev := self ["cryptsetup" ].get (self ["_cryptsetup_root" ]):
30+ return dev
31+ if len (self ["cryptsetup" ]) == 1 :
32+ return next (iter (self ["cryptsetup" ].values ()))
33+ raise ValueError ("Could not find a LUKS configuration" )
34+
35+ def _get_luks_uuid (self ):
36+ """ Gets the uuid from the cryptsetup root config """
37+ return _get_luks_config (self ).get ("uuid" )
38+
39+
40+ def _get_luks_keyfile (self ):
41+ """ Gets the luks keyfile the cryptsetup root config,
42+ if not defined, generates a keyfile using the banner """
43+ config = _get_luks_config (self )
44+ if keyfile := config .get ("key_file" ):
45+ return keyfile
46+ raise ValueError ("No LUKS key_file is set." )
47+
48+ def make_test_luks_image (self , image_path ):
49+ """ Creates a LUKS image to hold the test image """
50+ try :
51+ self ._run (["cryptsetup" , "status" , "test_image" ]) # Check if the LUKS device is already open
52+ self .logger .warning ("LUKS device 'test_image' is already open, closing it" )
53+ self ._run (["cryptsetup" , "luksClose" , "test_image" ])
54+ except RuntimeError :
55+ pass
56+ _allocate_image (self , image_path , padding = 32 ) # First allocate the image file, adding padding for the LUKS header
57+ keyfile_path = _get_luks_keyfile (self )
58+ self .logger .info ("Using LUKS keyfile: %s" % keyfile_path )
59+ self .logger .info ("Creating LUKS image: %s" % image_path )
60+ self ._run (["cryptsetup" , "luksFormat" , image_path , "--uuid" , _get_luks_uuid (self ), "--batch-mode" , "--key-file" , keyfile_path ])
61+ self .logger .info ("Opening LUKS image: %s" % image_path )
62+ self ._run (["cryptsetup" , "luksOpen" , image_path , "test_image" , "--key-file" , keyfile_path ])
63+
2664def make_test_image (self ):
2765 """Creates a test image from the build dir"""
2866 build_dir = self ._get_build_path ("/" ).resolve ()
2967 self .logger .info ("Creating test image from: %s" % build_dir )
3068
3169 rootfs_uuid = self ["mounts" ]["root" ]["uuid" ]
3270 rootfs_type = self ["mounts" ]["root" ]["type" ]
33-
3471 image_path = self ._get_out_path (self ["out_file" ])
35- if rootfs_type == "ext4" :
36- # Create the test image file, flll with 0s
72+
73+ if self .get ("cryptsetup" ): # If there is cryptsetup config, create a LUKS image
74+ make_test_luks_image (self , image_path )
75+ image_path = "/dev/mapper/test_image"
76+ else :
3777 _allocate_image (self , image_path )
78+
79+ if rootfs_type == "ext4" :
3880 self ._run (["mkfs" , "-t" , rootfs_type , "-d" , build_dir , "-U" , rootfs_uuid , "-F" , image_path ])
3981 elif rootfs_type == "btrfs" :
40- if self ["clean" ] and image_path .exists ():
41- self .logger .warning ("Removing existing test image file: %s" % image_path )
42- image_path .unlink ()
4382 self ._run (["mkfs" , "-t" , rootfs_type , "-f" , "--rootdir" , build_dir , "-U" , rootfs_uuid , image_path ])
4483 elif rootfs_type == "xfs" :
45- _allocate_image (self , image_path )
4684 self ._run (["mkfs" , "-t" , rootfs_type , "-m" , "uuid=%s" % rootfs_uuid , image_path ])
4785 try : # XFS doesn't support importing a directory as a filesystem, it must be mounted
4886 from tempfile import TemporaryDirectory
49-
5087 with TemporaryDirectory () as tmp_dir :
5188 self ._run (["mount" , image_path , tmp_dir ])
5289 self ._run (["cp" , "-a" , f"{ build_dir } /." , tmp_dir ])
@@ -55,3 +92,7 @@ def make_test_image(self):
5592 raise RuntimeError ("Could not mount the XFS test image: %s" , e )
5693 else :
5794 raise NotImplementedError ("Unsupported test rootfs type: %s" % rootfs_type )
95+
96+ if self .get ("cryptsetup" ): # Leave it open in the event of failure, close it before executing tests
97+ self .logger .info ("Closing LUKS image: %s" % image_path )
98+ self ._run (["cryptsetup" , "luksClose" , "test_image" ])
0 commit comments