Skip to content

Commit 3a2edf9

Browse files
authored
Merge pull request #177 from desultory/dev
add generic findmount function, improve mount detetction
2 parents f2b533d + 7f1d397 commit 3a2edf9

File tree

1 file changed

+125
-113
lines changed

1 file changed

+125
-113
lines changed

src/ugrd/fs/mounts.py

Lines changed: 125 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__author__ = "desultory"
2-
__version__ = "6.3.0"
2+
__version__ = "6.5.0"
33

44
from pathlib import Path
55
from typing import Union
@@ -22,6 +22,103 @@
2222
]
2323

2424

25+
def _get_device_id(device: str) -> str:
26+
"""Gets the device id from the device path."""
27+
return Path(device).stat().st_rdev >> 8, Path(device).stat().st_rdev & 0xFF
28+
29+
30+
def _resolve_dev(self, device_path) -> str:
31+
"""Resolves a device to one indexed in blkid.
32+
33+
Takes the device path, such as /dev/root, and resolves it to a device indexed in blkid.
34+
If the device is an overlayfs, resolves the lowerdir device.
35+
"""
36+
if str(device_path) in self["_blkid_info"]:
37+
self.logger.debug("Device already resolved to blkid indexed device: %s" % device_path)
38+
return device_path
39+
40+
self.logger.debug("Resolving device: %s" % device_path)
41+
mountpoint = _resolve_device_mountpoint(self, device_path)
42+
device_path = _resolve_overlay_lower_device(self, mountpoint)
43+
mountpoint = _resolve_device_mountpoint(self, device_path) # May have changed if it was an overlayfs
44+
45+
major, minor = _get_device_id(self["_mounts"][mountpoint]["device"])
46+
for device in self["_blkid_info"]:
47+
check_major, check_minor = _get_device_id(device)
48+
if (major, minor) == (check_major, check_minor):
49+
self.logger.info("Resolved device: %s -> %s" % (device_path, colorize(device, "cyan")))
50+
return device
51+
self.logger.critical("Failed to resolve device: %s" % colorize(device_path, "red", bold=True))
52+
self.logger.error("Blkid info: %s" % pretty_print(self["_blkid_info"]))
53+
self.logger.error("Mount info: %s" % pretty_print(self["_mounts"]))
54+
return device_path
55+
56+
57+
def _find_mountpoint(self, path: str) -> str:
58+
"""Finds the mountpoint of a file or directory,
59+
Checks if the parent dir is a mountpoint, if not, recursively checks the parent dir."""
60+
check_path = Path(path).resolve()
61+
parent = check_path.parent if not check_path.is_dir() else check_path
62+
if str(parent) in self["_mounts"]:
63+
return str(parent)
64+
elif parent == Path("/"): # The root mount SHOULD always be found...
65+
raise AutodetectError("Mountpoint not found for: %s" % path)
66+
return _find_mountpoint(self, parent.parent)
67+
68+
69+
def _resolve_device_mountpoint(self, device) -> str:
70+
"""Gets the mountpoint of a device based on the device path."""
71+
for mountpoint, mount_info in self["_mounts"].items():
72+
if str(device) == mount_info["device"]:
73+
return mountpoint
74+
raise AutodetectError("Device mountpoint not found: %s" % device)
75+
76+
77+
def _resolve_overlay_lower_dir(self, mountpoint) -> str:
78+
for option in self["_mounts"][mountpoint]["options"]:
79+
if option.startswith("lowerdir="):
80+
return option.removeprefix("lowerdir=")
81+
raise AutodetectError(
82+
"[%s] No lower overlayfs mountpoint found: %s" % mountpoint, self["_mounts"][mountpoint]["options"]
83+
)
84+
85+
86+
def _resolve_overlay_lower_device(self, mountpoint) -> dict:
87+
"""Returns device for the lower overlayfs mountpoint.
88+
If it's not an overlayfs, returns the device for the mountpoint.
89+
90+
If it is, iterate through the lowerdir devices until a non-overlayfs mount is found.
91+
"""
92+
if self["_mounts"][mountpoint]["fstype"] != "overlay":
93+
return self["_mounts"][mountpoint]["device"]
94+
95+
while self["_mounts"][mountpoint]["fstype"] == "overlay":
96+
lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
97+
mountpoint = _find_mountpoint(self, lowerdir)
98+
if mountpoint == "/": # The lowerdir mount should never be the root mount
99+
raise AutodetectError("Lowerdir mount not found: %s" % lowerdir)
100+
101+
return self["_mounts"][mountpoint]["device"]
102+
103+
104+
def _merge_mounts(self, mount_name: str, mount_config, mount_class) -> None:
105+
"""Merges the passed mount config with the existing mount."""
106+
if mount_name not in self[mount_class]:
107+
self.logger.debug("[%s] Skipping mount merge, mount not found: %s" % (mount_class, mount_name))
108+
return mount_config
109+
110+
self.logger.info("[%s] Updating mount: %s" % (mount_class, mount_name))
111+
self.logger.debug("[%s] Updating mount with: %s" % (mount_name, mount_config))
112+
if "options" in self[mount_class][mount_name] and "options" in mount_config:
113+
self.logger.debug("Merging options: %s" % mount_config["options"])
114+
self[mount_class][mount_name]["options"] = self[mount_class][mount_name]["options"] | set(
115+
mount_config["options"]
116+
)
117+
mount_config.pop("options")
118+
119+
return dict(self[mount_class][mount_name], **mount_config)
120+
121+
25122
@contains("validate", "Skipping mount validation, validation is disabled.", log_level=30)
26123
def _validate_mount_config(self, mount_name: str, mount_config) -> None:
27124
"""Validate the mount config."""
@@ -51,24 +148,6 @@ def _validate_mount_config(self, mount_name: str, mount_config) -> None:
51148
raise ValueError("Invalid parameter in mount: %s" % parameter)
52149

53150

54-
def _merge_mounts(self, mount_name: str, mount_config, mount_class) -> None:
55-
"""Merges the passed mount config with the existing mount."""
56-
if mount_name not in self[mount_class]:
57-
self.logger.debug("[%s] Skipping mount merge, mount not found: %s" % (mount_class, mount_name))
58-
return mount_config
59-
60-
self.logger.info("[%s] Updating mount: %s" % (mount_class, mount_name))
61-
self.logger.debug("[%s] Updating mount with: %s" % (mount_name, mount_config))
62-
if "options" in self[mount_class][mount_name] and "options" in mount_config:
63-
self.logger.debug("Merging options: %s" % mount_config["options"])
64-
self[mount_class][mount_name]["options"] = self[mount_class][mount_name]["options"] | set(
65-
mount_config["options"]
66-
)
67-
mount_config.pop("options")
68-
69-
return dict(self[mount_class][mount_name], **mount_config)
70-
71-
72151
def _process_mount(self, mount_name: str, mount_config, mount_class="mounts") -> None:
73152
"""Processes the passed mount config."""
74153
mount_config = _merge_mounts(self, mount_name, mount_config, mount_class)
@@ -262,32 +341,34 @@ def get_blkid_info(self, device=None) -> dict:
262341

263342

264343
@contains("init_target", "init_target must be set", raise_exception=True)
265-
@contains(
266-
"autodetect_init_mount", "Skipping init mount autodetection, autodetect_init_mount is disabled.", log_level=30
267-
)
344+
@contains("autodetect_init_mount", "Init mount autodetection disabled, skipping.", log_level=30)
268345
@contains("hostonly", "Skipping init mount autodetection, hostonly mode is disabled.", log_level=30)
269-
def autodetect_init_mount(self, parent=None) -> None:
346+
def autodetect_init_mount(self) -> None:
270347
"""Checks the parent directories of init_target, if the path is a mountpoint, add it to late_mounts."""
271-
if not parent:
272-
parent = self["init_target"].parent
273-
if parent == Path("/"):
348+
init_mount = _find_mountpoint(self, self["init_target"])
349+
if init_mount == "/":
274350
return
275-
if str(parent) in self["_mounts"]:
276-
self.logger.info("Detected init mount: %s" % colorize(parent, "cyan"))
277-
mount_name = str(parent).removeprefix("/")
278-
mount_dest = str(parent)
279-
mount_device = self["_mounts"][str(parent)]["device"]
280-
mount_type = self["_mounts"][str(parent)]["fstype"]
281-
mount_options = self["_mounts"][str(parent)]["options"]
282-
blkid_info = self["_blkid_info"][mount_device]
283-
mount_source_type, mount_source = _get_mount_source_type(self, blkid_info, with_val=True)
284-
self["late_mounts"][mount_name] = {
285-
"destination": mount_dest,
286-
mount_source_type: mount_source,
287-
"type": mount_type,
288-
"options": mount_options,
289-
}
290-
autodetect_init_mount(self, parent.parent)
351+
352+
if init_mount in self["late_mounts"]:
353+
return self.logger.debug("Init mount already detected: %s" % init_mount)
354+
355+
if init_mount not in self["_mounts"]:
356+
raise AutodetectError("Init mount not found in host mounts: %s" % init_mount)
357+
358+
self.logger.info("Detected init mount: %s" % colorize(init_mount, "cyan"))
359+
mount_name = init_mount.removeprefix("/")
360+
mount_dest = init_mount
361+
mount_device = self["_mounts"][init_mount]["device"]
362+
mount_type = self["_mounts"][init_mount]["fstype"]
363+
mount_options = self["_mounts"][init_mount]["options"]
364+
blkid_info = self["_blkid_info"][mount_device]
365+
mount_source_type, mount_source = _get_mount_source_type(self, blkid_info, with_val=True)
366+
self["late_mounts"][mount_name] = {
367+
"destination": mount_dest,
368+
mount_source_type: mount_source,
369+
"type": mount_type,
370+
"options": mount_options,
371+
}
291372

292373

293374
@contains("hostonly", "Skipping virtual block device enumeration, hostonly mode is disabled.", log_level=30)
@@ -336,11 +417,6 @@ def get_virtual_block_info(self) -> dict:
336417
self.logger.debug("No virtual block devices found.")
337418

338419

339-
def _get_device_id(device: str) -> str:
340-
"""Gets the device id from the device path."""
341-
return Path(device).stat().st_rdev >> 8, Path(device).stat().st_rdev & 0xFF
342-
343-
344420
@contains("hostonly", "Skipping device mapper autodetection, hostonly mode is disabled.", log_level=30)
345421
def _autodetect_dm(self, mountpoint, device=None) -> None:
346422
"""Autodetects device mapper config given a mountpoint.
@@ -516,71 +592,6 @@ def autodetect_luks(self, mount_loc, dm_num, blkid_info) -> None:
516592
)
517593

518594

519-
def _resolve_dev(self, device_path) -> str:
520-
"""Resolves a device to one indexed in blkid.
521-
522-
Takes the device path, such as /dev/root, and resolves it to a device indexed in blkid.
523-
If the device is an overlayfs, resolves the lowerdir device.
524-
"""
525-
if str(device_path) in self["_blkid_info"]:
526-
self.logger.debug("Device already resolved to blkid indexed device: %s" % device_path)
527-
return device_path
528-
529-
self.logger.debug("Resolving device: %s" % device_path)
530-
mountpoint = _resolve_device_mountpoint(self, device_path)
531-
device_path = _resolve_overlay_lower_device(self, mountpoint)
532-
mountpoint = _resolve_device_mountpoint(self, device_path) # May have changed if it was an overlayfs
533-
534-
major, minor = _get_device_id(self["_mounts"][mountpoint]["device"])
535-
for device in self["_blkid_info"]:
536-
check_major, check_minor = _get_device_id(device)
537-
if (major, minor) == (check_major, check_minor):
538-
self.logger.info("Resolved device: %s -> %s" % (device_path, colorize(device, "cyan")))
539-
return device
540-
self.logger.critical("Failed to resolve device: %s" % colorize(device_path, "red", bold=True))
541-
self.logger.error("Blkid info: %s" % pretty_print(self["_blkid_info"]))
542-
self.logger.error("Mount info: %s" % pretty_print(self["_mounts"]))
543-
return device_path
544-
545-
546-
def _resolve_device_mountpoint(self, device) -> str:
547-
"""Gets the mountpoint of a device based on the device path."""
548-
for mountpoint, mount_info in self["_mounts"].items():
549-
if str(device) == mount_info["device"]:
550-
return mountpoint
551-
raise AutodetectError("Device mountpoint not found: %s" % device)
552-
553-
554-
def _resolve_overlay_lower_dir(self, mountpoint) -> str:
555-
for option in self["_mounts"][mountpoint]["options"]:
556-
if option.startswith("lowerdir="):
557-
return option.removeprefix("lowerdir=")
558-
raise AutodetectError(
559-
"[%s] No lower overlayfs mountpoint found: %s" % mountpoint, self["_mounts"][mountpoint]["options"]
560-
)
561-
562-
563-
def _resolve_overlay_lower_device(self, mountpoint) -> dict:
564-
"""Returns device for the lower overlayfs mountpoint.
565-
If it's not an overlayfs, returns the device for the mountpoint.
566-
567-
If it is, iterate through the lowerdir devices until a non-overlayfs mount is found.
568-
"""
569-
if self["_mounts"][mountpoint]["fstype"] != "overlay":
570-
return self["_mounts"][mountpoint]["device"]
571-
572-
while self["_mounts"][mountpoint]["fstype"] == "overlay":
573-
lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
574-
lower_path = Path(lowerdir)
575-
while str(lower_path) not in self["_mounts"]:
576-
lower_path = lower_path.parent
577-
if lower_path == Path("/"):
578-
raise AutodetectError("Lowerdir mount not found: %s" % lowerdir)
579-
mountpoint = str(lower_path)
580-
581-
return self["_mounts"][mountpoint]["device"]
582-
583-
584595
@contains("autodetect_root", "Skipping root autodetection, autodetect_root is disabled.", log_level=30)
585596
@contains("hostonly", "Skipping root autodetection, hostonly mode is disabled.", log_level=30)
586597
def autodetect_root(self) -> None:
@@ -825,7 +836,8 @@ def resolve_blkdev_kmod(self, device) -> list[str]:
825836
if "/usb" in sys_dev:
826837
if "ugrd.kmod.usb" not in self["modules"]:
827838
self.logger.info(
828-
"Auto-enabling %s for USB device: %s" % (colorize("ugrd.kmod.usb", bold=True), colorize(device, "cyan"))
839+
"Auto-enabling %s for USB device: %s"
840+
% (colorize("ugrd.kmod.usb", bold=True), colorize(device, "cyan"))
829841
)
830842
self["modules"] = "ugrd.kmod.usb"
831843
device_name = dev.name

0 commit comments

Comments
 (0)