Skip to content

Commit cc00de5

Browse files
committed
reorganize functions
Signed-off-by: Zen <[email protected]>
1 parent bcd3b48 commit cc00de5

File tree

1 file changed

+90
-89
lines changed

1 file changed

+90
-89
lines changed

src/ugrd/fs/mounts.py

Lines changed: 90 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,94 @@
2222
]
2323

2424

25+
def _get_device_id(device: str) -> str:
26+
"""Gets the device id from the device path."""
27+
return Path(device).stat().st_rdev >> 8, Path(device).stat().st_rdev & 0xFF
28+
29+
30+
def _resolve_dev(self, device_path) -> str:
31+
"""Resolves a device to one indexed in blkid.
32+
33+
Takes the device path, such as /dev/root, and resolves it to a device indexed in blkid.
34+
If the device is an overlayfs, resolves the lowerdir device.
35+
"""
36+
if str(device_path) in self["_blkid_info"]:
37+
self.logger.debug("Device already resolved to blkid indexed device: %s" % device_path)
38+
return device_path
39+
40+
self.logger.debug("Resolving device: %s" % device_path)
41+
mountpoint = _resolve_device_mountpoint(self, device_path)
42+
device_path = _resolve_overlay_lower_device(self, mountpoint)
43+
mountpoint = _resolve_device_mountpoint(self, device_path) # May have changed if it was an overlayfs
44+
45+
major, minor = _get_device_id(self["_mounts"][mountpoint]["device"])
46+
for device in self["_blkid_info"]:
47+
check_major, check_minor = _get_device_id(device)
48+
if (major, minor) == (check_major, check_minor):
49+
self.logger.info("Resolved device: %s -> %s" % (device_path, colorize(device, "cyan")))
50+
return device
51+
self.logger.critical("Failed to resolve device: %s" % colorize(device_path, "red", bold=True))
52+
self.logger.error("Blkid info: %s" % pretty_print(self["_blkid_info"]))
53+
self.logger.error("Mount info: %s" % pretty_print(self["_mounts"]))
54+
return device_path
55+
56+
57+
def _resolve_device_mountpoint(self, device) -> str:
58+
"""Gets the mountpoint of a device based on the device path."""
59+
for mountpoint, mount_info in self["_mounts"].items():
60+
if str(device) == mount_info["device"]:
61+
return mountpoint
62+
raise AutodetectError("Device mountpoint not found: %s" % device)
63+
64+
65+
def _resolve_overlay_lower_dir(self, mountpoint) -> str:
66+
for option in self["_mounts"][mountpoint]["options"]:
67+
if option.startswith("lowerdir="):
68+
return option.removeprefix("lowerdir=")
69+
raise AutodetectError(
70+
"[%s] No lower overlayfs mountpoint found: %s" % mountpoint, self["_mounts"][mountpoint]["options"]
71+
)
72+
73+
74+
def _resolve_overlay_lower_device(self, mountpoint) -> dict:
75+
"""Returns device for the lower overlayfs mountpoint.
76+
If it's not an overlayfs, returns the device for the mountpoint.
77+
78+
If it is, iterate through the lowerdir devices until a non-overlayfs mount is found.
79+
"""
80+
if self["_mounts"][mountpoint]["fstype"] != "overlay":
81+
return self["_mounts"][mountpoint]["device"]
82+
83+
while self["_mounts"][mountpoint]["fstype"] == "overlay":
84+
lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
85+
lower_path = Path(lowerdir)
86+
while str(lower_path) not in self["_mounts"]:
87+
lower_path = lower_path.parent
88+
if lower_path == Path("/"):
89+
raise AutodetectError("Lowerdir mount not found: %s" % lowerdir)
90+
mountpoint = str(lower_path)
91+
92+
return self["_mounts"][mountpoint]["device"]
93+
94+
95+
def _merge_mounts(self, mount_name: str, mount_config, mount_class) -> None:
96+
"""Merges the passed mount config with the existing mount."""
97+
if mount_name not in self[mount_class]:
98+
self.logger.debug("[%s] Skipping mount merge, mount not found: %s" % (mount_class, mount_name))
99+
return mount_config
100+
101+
self.logger.info("[%s] Updating mount: %s" % (mount_class, mount_name))
102+
self.logger.debug("[%s] Updating mount with: %s" % (mount_name, mount_config))
103+
if "options" in self[mount_class][mount_name] and "options" in mount_config:
104+
self.logger.debug("Merging options: %s" % mount_config["options"])
105+
self[mount_class][mount_name]["options"] = self[mount_class][mount_name]["options"] | set(
106+
mount_config["options"]
107+
)
108+
mount_config.pop("options")
109+
110+
return dict(self[mount_class][mount_name], **mount_config)
111+
112+
25113
@contains("validate", "Skipping mount validation, validation is disabled.", log_level=30)
26114
def _validate_mount_config(self, mount_name: str, mount_config) -> None:
27115
"""Validate the mount config."""
@@ -51,24 +139,6 @@ def _validate_mount_config(self, mount_name: str, mount_config) -> None:
51139
raise ValueError("Invalid parameter in mount: %s" % parameter)
52140

53141

54-
def _merge_mounts(self, mount_name: str, mount_config, mount_class) -> None:
55-
"""Merges the passed mount config with the existing mount."""
56-
if mount_name not in self[mount_class]:
57-
self.logger.debug("[%s] Skipping mount merge, mount not found: %s" % (mount_class, mount_name))
58-
return mount_config
59-
60-
self.logger.info("[%s] Updating mount: %s" % (mount_class, mount_name))
61-
self.logger.debug("[%s] Updating mount with: %s" % (mount_name, mount_config))
62-
if "options" in self[mount_class][mount_name] and "options" in mount_config:
63-
self.logger.debug("Merging options: %s" % mount_config["options"])
64-
self[mount_class][mount_name]["options"] = self[mount_class][mount_name]["options"] | set(
65-
mount_config["options"]
66-
)
67-
mount_config.pop("options")
68-
69-
return dict(self[mount_class][mount_name], **mount_config)
70-
71-
72142
def _process_mount(self, mount_name: str, mount_config, mount_class="mounts") -> None:
73143
"""Processes the passed mount config."""
74144
mount_config = _merge_mounts(self, mount_name, mount_config, mount_class)
@@ -336,11 +406,6 @@ def get_virtual_block_info(self) -> dict:
336406
self.logger.debug("No virtual block devices found.")
337407

338408

339-
def _get_device_id(device: str) -> str:
340-
"""Gets the device id from the device path."""
341-
return Path(device).stat().st_rdev >> 8, Path(device).stat().st_rdev & 0xFF
342-
343-
344409
@contains("hostonly", "Skipping device mapper autodetection, hostonly mode is disabled.", log_level=30)
345410
def _autodetect_dm(self, mountpoint, device=None) -> None:
346411
"""Autodetects device mapper config given a mountpoint.
@@ -516,71 +581,6 @@ def autodetect_luks(self, mount_loc, dm_num, blkid_info) -> None:
516581
)
517582

518583

519-
def _resolve_dev(self, device_path) -> str:
520-
"""Resolves a device to one indexed in blkid.
521-
522-
Takes the device path, such as /dev/root, and resolves it to a device indexed in blkid.
523-
If the device is an overlayfs, resolves the lowerdir device.
524-
"""
525-
if str(device_path) in self["_blkid_info"]:
526-
self.logger.debug("Device already resolved to blkid indexed device: %s" % device_path)
527-
return device_path
528-
529-
self.logger.debug("Resolving device: %s" % device_path)
530-
mountpoint = _resolve_device_mountpoint(self, device_path)
531-
device_path = _resolve_overlay_lower_device(self, mountpoint)
532-
mountpoint = _resolve_device_mountpoint(self, device_path) # May have changed if it was an overlayfs
533-
534-
major, minor = _get_device_id(self["_mounts"][mountpoint]["device"])
535-
for device in self["_blkid_info"]:
536-
check_major, check_minor = _get_device_id(device)
537-
if (major, minor) == (check_major, check_minor):
538-
self.logger.info("Resolved device: %s -> %s" % (device_path, colorize(device, "cyan")))
539-
return device
540-
self.logger.critical("Failed to resolve device: %s" % colorize(device_path, "red", bold=True))
541-
self.logger.error("Blkid info: %s" % pretty_print(self["_blkid_info"]))
542-
self.logger.error("Mount info: %s" % pretty_print(self["_mounts"]))
543-
return device_path
544-
545-
546-
def _resolve_device_mountpoint(self, device) -> str:
547-
"""Gets the mountpoint of a device based on the device path."""
548-
for mountpoint, mount_info in self["_mounts"].items():
549-
if str(device) == mount_info["device"]:
550-
return mountpoint
551-
raise AutodetectError("Device mountpoint not found: %s" % device)
552-
553-
554-
def _resolve_overlay_lower_dir(self, mountpoint) -> str:
555-
for option in self["_mounts"][mountpoint]["options"]:
556-
if option.startswith("lowerdir="):
557-
return option.removeprefix("lowerdir=")
558-
raise AutodetectError(
559-
"[%s] No lower overlayfs mountpoint found: %s" % mountpoint, self["_mounts"][mountpoint]["options"]
560-
)
561-
562-
563-
def _resolve_overlay_lower_device(self, mountpoint) -> dict:
564-
"""Returns device for the lower overlayfs mountpoint.
565-
If it's not an overlayfs, returns the device for the mountpoint.
566-
567-
If it is, iterate through the lowerdir devices until a non-overlayfs mount is found.
568-
"""
569-
if self["_mounts"][mountpoint]["fstype"] != "overlay":
570-
return self["_mounts"][mountpoint]["device"]
571-
572-
while self["_mounts"][mountpoint]["fstype"] == "overlay":
573-
lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
574-
lower_path = Path(lowerdir)
575-
while str(lower_path) not in self["_mounts"]:
576-
lower_path = lower_path.parent
577-
if lower_path == Path("/"):
578-
raise AutodetectError("Lowerdir mount not found: %s" % lowerdir)
579-
mountpoint = str(lower_path)
580-
581-
return self["_mounts"][mountpoint]["device"]
582-
583-
584584
@contains("autodetect_root", "Skipping root autodetection, autodetect_root is disabled.", log_level=30)
585585
@contains("hostonly", "Skipping root autodetection, hostonly mode is disabled.", log_level=30)
586586
def autodetect_root(self) -> None:
@@ -825,7 +825,8 @@ def resolve_blkdev_kmod(self, device) -> list[str]:
825825
if "/usb" in sys_dev:
826826
if "ugrd.kmod.usb" not in self["modules"]:
827827
self.logger.info(
828-
"Auto-enabling %s for USB device: %s" % (colorize("ugrd.kmod.usb", bold=True), colorize(device, "cyan"))
828+
"Auto-enabling %s for USB device: %s"
829+
% (colorize("ugrd.kmod.usb", bold=True), colorize(device, "cyan"))
829830
)
830831
self["modules"] = "ugrd.kmod.usb"
831832
device_name = dev.name

0 commit comments

Comments
 (0)