Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/base1/test-dbus-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,15 @@ export function common_dbus_tests(channel_options, bus_name) { // eslint-disable

QUnit.test("signal unsubscribe", async assert => {
let received = false;
let signals = 0;
const dbus = cockpit.dbus(bus_name, channel_options);

function on_signal() {
received = true;
signals++;
}

const subscription = dbus.subscribe({
let subscription = dbus.subscribe({
interface: "com.redhat.Cockpit.DBusTests.Frobber",
path: "/otree/frobber"
}, on_signal);
Expand All @@ -302,6 +304,19 @@ export function common_dbus_tests(channel_options, bus_name) { // eslint-disable
received = false;
await dbus.call("/otree/frobber", "com.redhat.Cockpit.DBusTests.Frobber", "RequestSignalEmission", [0]);
assert.equal(received, false, "signal not received");
assert.equal(signals, 1, "received exactly one signal");

subscription = dbus.subscribe({
interface: "com.redhat.Cockpit.DBusTests.Frobber",
path: "/otree/frobber"
}, on_signal);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I twould be good to subscribe twice with the same rule but two different signal handlers, and then check that both handlers are called, and that after removing one of them, the other is still called.

await dbus.call("/otree/frobber", "com.redhat.Cockpit.DBusTests.Frobber", "RequestSignalEmission", [0]);
assert.equal(received, true, "signal received");
assert.equal(signals, 2, "received exactly two signals, watch was removed");
subscription.remove();
await dbus.call("/otree/frobber", "com.redhat.Cockpit.DBusTests.Frobber", "RequestSignalEmission", [0]);
assert.equal(signals, 2, "received exactly three signals, ");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"two signals"?

dbus.close();
});

QUnit.test("with types", assert => {
Expand Down
32 changes: 28 additions & 4 deletions src/cockpit/channels/dbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async def get_ready() -> None:
else:
self.ready()

def add_signal_handler(self, handler, **kwargs):
def get_r(self, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"r"? rule?

also: please add types as you go

r = dict(**kwargs)
r['type'] = 'signal'
if 'sender' not in r and self.name is not None:
Expand All @@ -276,6 +276,14 @@ def add_signal_handler(self, handler, **kwargs):
if r.get('path_namespace') == "/":
del r['path_namespace']

return r

def get_r_string(self, r):
return ','.join(f"{key}='{value}'" for key, value in r.items())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order matters here. I think we need to explicitly sort the keys alphabetically or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


def add_signal_handler(self, handler, **kwargs):
r = self.get_r(**kwargs)

def filter_owner(message):
if self.owner is not None and self.owner == message.get_sender():
handler(message)
Expand All @@ -285,6 +293,7 @@ def filter_owner(message):
else:
func = handler
r_string = ','.join(f"{key}='{value}'" for key, value in r.items())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woh deja vue

logger.debug("r_string %s", r_string)
if not self.is_closing():
# this gets an EINTR very often especially on RHEL 8
while True:
Expand All @@ -294,7 +303,7 @@ def filter_owner(message):
except InterruptedError:
pass

self.matches.append(match)
self.matches.append((r_string, match))

def add_async_signal_handler(self, handler, **kwargs):
def sync_handler(message):
Expand Down Expand Up @@ -363,7 +372,7 @@ async def do_add_match(self, message):
logger.debug('adding match %s', add_match)

async def match_hit(message):
logger.debug('got match')
logger.debug('got match %s', message)
async with self.watch_processing_lock:
self.send_json(signal=[
message.get_path(),
Expand All @@ -374,6 +383,19 @@ async def match_hit(message):

self.add_async_signal_handler(match_hit, **add_match)

async def do_remove_match(self, message):
remove_match = message['remove-match']
logger.debug('remove match %s', remove_match)
r = self.get_r(**remove_match)
r_string = self.get_r_string(r)
for index, (r_key, slot) in enumerate(self.matches):
if r_key == r_string:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This starts to look like a multidict...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better data structure is indeed welcome :)

logger.debug('got match %s %d', self.matches, index)
slot.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and systemd will send the RemoveMatch for us?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, the test passes :) But how would I verify this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I have tested this with a simple systemd_ctypes example :)

import asyncio

from systemd_ctypes import Bus, EventLoopPolicy, introspection


def property_changed(message):
    print('Property changed:', message.get_body())
    return 0


async def main():
    system = Bus.default_system()
    slot1 = system.add_match("interface='org.freedesktop.DBus.Properties'", property_changed)
    slot2 = system.add_match("interface='org.freedesktop.DBus.Properties'", property_changed)
    await asyncio.sleep(10)
    print("canelling slot1")
    slot1.cancel()
    print("canelled slot1")
    await asyncio.sleep(1000)
    print(slot2)


asyncio.set_event_loop_policy(EventLoopPolicy())
asyncio.run(main())

This combined with busctl monitor:

‣ Type=method_call  Endian=l  Flags=0  Version=1 Cookie=2  Timestamp="Tue 2025-12-02 15:34:29.972283 UTC"
  Sender=:1.535  Destination=org.freedesktop.DBus  Path=/org/freedesktop/DBus  Interface=org.freedesktop.DBus  Member=AddMatch
  UniqueName=:1.535
  MESSAGE "s" {
          STRING "interface='org.freedesktop.DBus.Properties'";
  };

‣ Type=method_return  Endian=l  Flags=1  Version=1 Cookie=-1  ReplyCookie=2  Timestamp="Tue 2025-12-02 15:34:29.972293 UTC"
  Sender=org.freedesktop.DBus  Destination=:1.535
  MESSAGE "" {
  };

‣ Type=method_call  Endian=l  Flags=0  Version=1 Cookie=3  Timestamp="Tue 2025-12-02 15:34:29.972303 UTC"
  Sender=:1.535  Destination=org.freedesktop.DBus  Path=/org/freedesktop/DBus  Interface=org.freedesktop.DBus  Member=AddMatch
  UniqueName=:1.535
  MESSAGE "s" {
          STRING "interface='org.freedesktop.DBus.Properties'";
  };

‣ Type=method_return  Endian=l  Flags=1  Version=1 Cookie=-1  ReplyCookie=3  Timestamp="Tue 2025-12-02 15:34:29.972314 UTC"
  Sender=org.freedesktop.DBus  Destination=:1.535
  MESSAGE "" {
  };

So two matches added! Then one removed, so systemd does not combine them.

‣ Type=method_call  Endian=l  Flags=1  Version=1 Cookie=4  Timestamp="Tue 2025-12-02 15:35:39.333566 UTC"
  Sender=:1.536  Destination=org.freedesktop.DBus  Path=/org/freedesktop/DBus  Interface=org.freedesktop.DBus  Member=RemoveMatch
  UniqueName=:1.536
  MESSAGE "s" {
          STRING "interface='org.freedesktop.DBus.Properties'";
  };

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do "use counting" for matches. If "add-match" has been received 5 times, there need to be 5 matching "remove-match"es before the slot can be cancelled. See protocol.md.

Is this taken care of properly? I guess so, because the 5 "add-match"es have made five slots and five entries in self.matches, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, the old C code seems to do this:

add-match

  peer = ensure_peer (self, name);
  if (cockpit_dbus_rules_add (peer->rules,
                              path ? path : path_namespace,
                              path_namespace ? TRUE : FALSE,
                              interface, signal, arg0))
cockpit_dbus_rules_add (CockpitDBusRules *rules,
                        const gchar *path,
                        gboolean is_namespace,
                        const gchar *interface,
                        const gchar *member,
                        const gchar *arg0)
{
  RuleData *rule = NULL;

<snip>

  if (rule == NULL)
    {
      rule = g_slice_new0 (RuleData);
      rule->refs = 1;
      rule->path = g_strdup (path);
      rule->is_namespace = is_namespace;
      rule->interface = g_strdup (interface);
      rule->member = g_strdup (member);
      rule->arg0 = g_strdup (arg0);
      g_hash_table_add (rules->all, rule);
      recompile_rules (rules);
      return TRUE;
    }
  else
    {
      rule->refs++;
      return FALSE;
    }

So it is actually more efficient! By keeping one slot for the same match rule.

Removing:

  rule = rule_lookup (rules, path, is_namespace, interface, member, arg0);
  if (rule == NULL)
    return FALSE;

  rule->refs--;
  if (rule->refs == 0)
    {
      g_hash_table_remove (rules->all, rule);
      recompile_rules (rules);
      return TRUE;
    }

  return FALSE;

del self.matches[index]
logger.debug('removed match %s %d', self.matches, index)
break

async def setup_objectmanager_watch(self, path, interface_name, meta, notify):
# Watch the objects managed by the ObjectManager at "path".
# Properties are not watched, that is done by setup_path_watch
Expand Down Expand Up @@ -505,6 +527,8 @@ def do_data(self, data):
self.create_task(self.do_call(message))
elif 'add-match' in message:
self.create_task(self.do_add_match(message))
elif 'remove-match' in message:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh. just flat out unimplemented. fascinating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is more:

# - removing matches
# - removing watches
# - emitting of signals
# - publishing of objects
# - failing more gracefully in some cases (during open, etc)

self.create_task(self.do_remove_match(message))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel slightly like we should try to handle this synchronously, at least the part about removing the entry from the list...

On the other hand, the entire wire protocol is wildly async, so maybe it's easier to keep with the prevailing style in this function of sending everything to tasks...

elif 'watch' in message:
self.create_task(self.do_watch(message))
elif 'meta' in message:
Expand All @@ -514,7 +538,7 @@ def do_data(self, data):
return

def do_close(self):
for slot in self.matches:
for (_, slot) in self.matches:
slot.cancel()
self.matches = []
self.close()