Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion qubes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,12 @@ def _domain_event_callback(self, _conn, domain, event, _detail, _opaque):
)
elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED:
try:
vm.fire_event("domain-unpaused")
if getattr(vm, "skip_unpause_event", False):
vm.skip_unpause_event = False
else:
asyncio.ensure_future(
vm.fire_event_async("domain-unpaused")
)
except Exception: # pylint: disable=broad-except
self.log.exception(
"Uncaught exception from domain-unpaused handler "
Expand Down
25 changes: 24 additions & 1 deletion qubes/ext/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
from qubes.device_protocol import DeviceInterface


PROHIBITED_FEATURES = [
"deferred-netvm-original",
"preload-dispvm",
"preload-dispvm-completed",
"preload-dispvm-in-progress",
]


class JustEvaluateAskResolution(parser.AskResolution):
async def execute(self):
pass
Expand All @@ -39,6 +47,8 @@ async def execute(self):


class AdminExtension(qubes.ext.Extension):
# pylint: disable=too-few-public-methods

def __init__(self):
super().__init__()
# during tests, __init__() of the extension can be called multiple
Expand All @@ -47,7 +57,20 @@ def __init__(self):
self.policy_cache = utils.PolicyCache(lazy_load=True)
self.policy_cache.initialize_watcher()

# pylint: disable=too-few-public-methods
@qubes.ext.handler(
"admin-permission:admin.vm.feature.Set",
"admin-permission:admin.vm.feature.Remove",
)
def on_feature_set_or_remove(self, vm, event, arg, **kwargs):
"""Forbid changing specific features"""
# pylint: disable=unused-argument
if arg in PROHIBITED_FEATURES:
raise qubes.exc.PermissionDenied(
"changing this feature is prohibited by {}.{}".format(
__name__, type(self).__name__
)
)

@qubes.ext.handler(
"admin-permission:admin.vm.tag.Set",
"admin-permission:admin.vm.tag.Remove",
Expand Down
20 changes: 20 additions & 0 deletions qubes/tests/api_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import qubes.tests
import qubes.storage


from qubes.device_protocol import (
DeviceInfo,
VirtualDevice,
Expand Down Expand Up @@ -1667,6 +1668,25 @@ def test_301_feature_remove_none(self):
)
self.assertFalse(self.app.save.called)

def test_303_feature_prohibited(self):
del self.app.domains[0].fire_event
feature = qubes.ext.admin.PROHIBITED_FEATURES[0]
with self.assertRaises(qubes.exc.PermissionDenied):
self.call_mgmt_func(
b"admin.vm.feature.Set",
b"test-vm1",
str(feature).encode(),
b"some-value",
)

self.vm.features[feature] = False
with self.assertRaises(qubes.exc.PermissionDenied):
self.call_mgmt_func(
b"admin.vm.feature.Remove",
b"test-vm1",
str(feature).encode(),
)

def test_310_feature_checkwithtemplate(self):
self.vm.features["test-feature"] = "some-value"
value = self.call_mgmt_func(
Expand Down
31 changes: 31 additions & 0 deletions qubes/tests/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import unittest.mock

import qubes.ext.admin
import qubes.ext.core_features
import qubes.ext.custom_persist
import qubes.ext.services
Expand Down Expand Up @@ -2579,3 +2580,33 @@ def test_015_feature_set_path_with_colon_without_options(self):
self.vm.untrusted_qdb.write.assert_called_with(
"/persist/test", "/var/test:dir:with:colon"
)


class TC_50_Admin(qubes.tests.QubesTestCase):
maxDiff = None

def setUp(self):
super().setUp()
self.ext = qubes.ext.admin.AdminExtension()

def tearDown(self):
self.ext.on_qubes_close("app", "qubes-close")
super().tearDown()

def test_000_features_permission(self):
feature = qubes.ext.admin.PROHIBITED_FEATURES[0]
with self.assertRaises(qubes.exc.PermissionDenied):
self.ext.on_feature_set_or_remove(
"test-vm1",
"admin-permission:admin.vm.feature.Set",
feature,
)

def test_000_tags_permission(self):
tag = "created-by-test"
with self.assertRaises(qubes.exc.PermissionDenied):
self.ext.on_tag_set_or_remove(
"test-vm1",
"admin-permission:admin.vm.tag.Set",
tag,
)
Loading