Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions kiwi/chroot_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) 2025 SUSE Software Solutions Germany GmbH. All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi. If not, see <http://www.gnu.org/licenses/>
#
import os
import logging
from typing import (
List, Optional
)

# project
from kiwi.mount_manager import MountManager
from kiwi.command import (
Command, CommandT, MutableMapping
)
from kiwi.exceptions import (
KiwiUmountBusyError
)

log = logging.getLogger('kiwi')


class ChrootManager:
"""
**Implements methods for setting and unsetting a chroot environment**

The caller is responsible for cleaning up bind mounts if the ChrootManager
is used as is, without a context.

The class also supports to be used as a context manager, where any bind or kernel
filesystem mount is unmounted once the context manager's with block is left

* :param string root_dir: path to change the root to
* :param list binds: current root paths to bind to the chrooted path
"""
def __init__(self, root_dir: str, binds: List[str] = []):
self.root_dir = root_dir
self.mounts: List[MountManager] = []
for bind in binds:
self.mounts.append(MountManager(
device=bind, mountpoint=os.path.normpath(
os.sep.join([root_dir, bind])
)
))

def __enter__(self) -> "ChrootManager":
try:
self.mount()
except Exception as e:
try:
self.umount()
except Exception:
pass
raise e
return self

def __exit__(self, exc_type, exc_value, traceback) -> None:
self.umount()

def mount(self) -> None:
"""
Mounts binds to the chroot path
"""
for mnt in self.mounts:
mnt.bind_mount()

def umount(self) -> None:
"""
Unmounts all binds from the chroot path

If any unmount raises a KiwiUmountBusyError this is trapped
and kept until the iteration over all bind mounts is over.
"""
errors = []
for mnt in reversed(self.mounts):
try:
mnt.umount()
except KiwiUmountBusyError as e:
errors.append(e)

if errors:
raise KiwiUmountBusyError(errors)

def run(
self, command: List[str],
custom_env: Optional[MutableMapping[str, str]] = None,
raise_on_error: bool = True, stderr_to_stdout: bool = False,
raise_on_command_not_found: bool = True
) -> Optional[CommandT]:
"""
This is a wrapper for Command.run method but pre-appending the
chroot call at the command list

:param list command: command and arguments
:param dict custom_env: custom os.environ
:param bool raise_on_error: control error behaviour
:param bool stderr_to_stdout: redirects stderr to stdout

:return:
Contains call results in command type

.. code:: python

CommandT(output='string', error='string', returncode=int)

:rtype: CommandT
"""
chroot_cmd = ['chroot', self.root_dir]
chroot_cmd = chroot_cmd + command
return Command.run(
chroot_cmd, custom_env, raise_on_error, stderr_to_stdout,
raise_on_command_not_found
)
113 changes: 71 additions & 42 deletions kiwi/volume_manager/btrfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

# project
from kiwi.command import Command
from kiwi.chroot_manager import ChrootManager
from kiwi.volume_manager.base import VolumeManagerBase
from kiwi.mount_manager import MountManager
from kiwi.storage.mapped_device import MappedDevice
from kiwi.filesystem import FileSystem
from kiwi.utils.command_capabilities import CommandCapabilities
from kiwi.utils.sync import DataSync
from kiwi.utils.block import BlockID
from kiwi.utils.sysconfig import SysConfig
Expand Down Expand Up @@ -135,24 +137,11 @@ def setup(self, name=None):
['btrfs', 'subvolume', 'create', root_volume]
)
if self.custom_args['root_is_snapper_snapshot']:
snapshot_volume = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots'
Command.run(
['btrfs', 'subvolume', 'create', snapshot_volume]
)
os.chmod(snapshot_volume, 0o700)
Path.create(snapshot_volume + '/1')
snapshot = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot'
Command.run(
['btrfs', 'subvolume', 'create', snapshot]
)
self._set_default_volume(
f'{self.root_volume_name}/.snapshots/1/snapshot'
)
self._create_first_snapper_snapshot_as_default()

# Mount /{some-name}/.snapshots as /.snapshots inside the root
snapshot = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot'
# Mount /{some-name}/.snapshots as /.snapshots inside the root
snapshots_mount = MountManager(
device=self.device,
attributes={
Expand Down Expand Up @@ -413,19 +402,17 @@ def sync_data(self, exclude=None):
"""
if self.toplevel_mount:
sync_target = self.get_mountpoint()
if self.custom_args['root_is_snapper_snapshot']:
self._create_snapshot_info(
''.join(
[
self.mountpoint,
f'/{self.root_volume_name}/.snapshots/1/info.xml'
]
)
)
data = DataSync(self.root_dir, sync_target)
data.sync_data(
options=Defaults.get_sync_options(), exclude=exclude
)
if self.custom_args['root_is_snapper_snapshot']:
self._create_snapshot_info(
os.sep.join([
self.mountpoint,
f'{self.root_volume_name}/.snapshots/1'
])
)
if self.custom_args['quota_groups'] and \
self.custom_args['root_is_snapper_snapshot']:
self._create_snapper_quota_configuration()
Expand Down Expand Up @@ -528,10 +515,10 @@ def _create_snapper_quota_configuration(self):
config_file = self._set_snapper_sysconfig_file(root_path)
if not os.path.exists(config_file):
shutil.copyfile(snapper_default_conf, config_file)
Command.run([
'chroot', root_path, 'snapper', '--no-dbus', 'set-config',
'QGROUP=1/0'
])
with ChrootManager(root_path) as chroot:
chroot.run([
'snapper', '--no-dbus', 'set-config', 'QGROUP=1/0'
])

@staticmethod
def _set_snapper_sysconfig_file(root_path):
Expand All @@ -554,24 +541,66 @@ def _set_snapper_sysconfig_file(root_path):
sysconf_file['SNAPPER_CONFIGS'].strip('\"')]
)

def _create_snapshot_info(self, filename):
date_info = datetime.datetime.now()
snapshot = ElementTree.Element('snapshot')
def _create_first_snapper_snapshot_as_default(self):
if CommandCapabilities.check_version(
'snapper', (0, 12, 1), root=self.root_dir
):
with ChrootManager(
self.root_dir, binds=[self.mountpoint]
) as chroot:
chroot.run([
'/usr/lib/snapper/installation-helper', '--root-prefix',
os.sep.join([self.mountpoint, self.root_volume_name]),
'--step', 'filesystem'
])
else:
snapshot_volume = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots'
Command.run(
['btrfs', 'subvolume', 'create', snapshot_volume]
)
os.chmod(snapshot_volume, 0o700)
Path.create(snapshot_volume + '/1')
snapshot = self.mountpoint + \
f'/{self.root_volume_name}/.snapshots/1/snapshot'
Command.run(
['btrfs', 'subvolume', 'create', snapshot]
)
self._set_default_volume(
f'{self.root_volume_name}/.snapshots/1/snapshot'
)

def _create_snapshot_info(self, path):
if CommandCapabilities.check_version(
'snapper', (0, 12, 1), root=self.mountpoint
):
snapshots_prefix = os.sep.join([path, '.snapshots'])
with ChrootManager(
self.root_dir, binds=[path, snapshots_prefix]
) as chroot:
chroot.run([
'/usr/lib/snapper/installation-helper', '--root-prefix',
path, '--step', 'config', '--description',
'first root filesystem'
])
else:
date_info = datetime.datetime.now()
snapshot = ElementTree.Element('snapshot')

snapshot_type = ElementTree.SubElement(snapshot, 'type')
snapshot_type.text = 'single'
snapshot_type = ElementTree.SubElement(snapshot, 'type')
snapshot_type.text = 'single'

snapshot_number = ElementTree.SubElement(snapshot, 'num')
snapshot_number.text = '1'
snapshot_number = ElementTree.SubElement(snapshot, 'num')
snapshot_number.text = '1'

snapshot_description = ElementTree.SubElement(snapshot, 'description')
snapshot_description.text = 'first root filesystem'
snapshot_description = ElementTree.SubElement(snapshot, 'description')
snapshot_description.text = 'first root filesystem'

snapshot_date = ElementTree.SubElement(snapshot, 'date')
snapshot_date.text = date_info.strftime("%Y-%m-%d %H:%M:%S")
snapshot_date = ElementTree.SubElement(snapshot, 'date')
snapshot_date.text = date_info.strftime("%Y-%m-%d %H:%M:%S")

with open(filename, 'w') as snapshot_info_file:
snapshot_info_file.write(self._xml_pretty(snapshot))
with open(os.path.join(path, 'info.xml'), 'w') as snapshot_info_file:
snapshot_info_file.write(self._xml_pretty(snapshot))

def __exit__(self, exc_type, exc_value, traceback):
if self.toplevel_mount:
Expand Down
Loading