-
Notifications
You must be signed in to change notification settings - Fork 17
Description
When a request is sent via a comm, it's important that the comm eventually gets an error or a result, otherwise the sender waits undefinitely for a response (response times are variable so timeouts are not always sound). For this reason RPC mechanisms can't reliably work if messages are silently dropped.
For regular Jupyter messages initiated by the kernel we at least get an error of type EHOSTUNREACH
because we have set ZMQ_ROUTER_MANDATORY
on our ROUTER sockets (otherwise messages are silently dropped, see http://api.zeromq.org/3-3:zmq-setsockopt). This allows our infrastructure to detect delivery failures and take appropriate actions to recover (fail an StdIn request made to the client for instance). We set the router to mandatory here:
ark/crates/amalthea/src/socket/socket.rs
Lines 83 to 93 in d838eef
// If this is a debug build, set `ZMQ_ROUTER_MANDATORY` on all `ROUTER` | |
// sockets, so that we get errors instead of silent message drops for | |
// unroutable messages. | |
#[cfg(debug_assertions)] | |
{ | |
if kind == zmq::ROUTER { | |
if let Err(err) = socket.set_router_mandatory(true) { | |
return Err(Error::SocketBindError(name, endpoint, err)); | |
} | |
} | |
} |
However we don't have this guarantee for the OpenRPC mechanism of our custom comms because comm messages originating from the kernel are sent over IOPub. With a (X)PUB socket, messages are silently dropped if no one is there to listen.
To work around this we could listen for "unsubscribe" events on our XPUB socket (see below). The ability of detecting disconnections is one of the perks of having switched to XPUB when we implemented JEP65 (https://github.com/posit-dev/ark/pull/577/files). Whereas our ROUTER sockets aren't notified of disconnections, XPUB are. From https://rfc.zeromq.org/spec/29:
SHALL receive subscribe and unsubscribe requests from subscribers depending on the transport protocol used.
SHALL, if the subscriber peer disconnects prematurely, generate a suitable unsubscribe request for the calling application.
We actually already handle (with a no-op handler) the unsubscribe notification here:
ark/crates/amalthea/src/socket/iopub.rs
Lines 270 to 275 in d838eef
SubscriptionKind::Unsubscribe => { | |
log::info!( | |
"Received unsubscribe message on IOPub with subscription '{subscription}'." | |
); | |
// We don't do anything on unsubscribes | |
return Ok(()); |
How we handle the disconnection depends on the comm type:
-
For persistent comms like plots, which hold state for the frontend, we just invalidate pending requests. There is a slight race condition here: we might invalidate requests for incoming responses that were emitted before the disconnection.
-
For all other comms (the default), we just close them and call a cleanup handler. This is the safest option and gets us ahead as we should destroy existing comms on reconnect anyway (see Jupyter: Refreshing the UI starts new comms instead of reusing existing ones positron#1126).