Description
There are a few inconsistencies in Calloop's higher-level event sources, and even though they are extremely minor, I thought I'd make the suggestion since I've coded up an alternative for the event sources I've made for ZeroMQ and USB.
Take for example:
Ping
hasmake_ping()
to construct, which returns a(sender, source)
pair. Using it requires callingping()
on the sender.Channel
haschannel()
(notmake_channel()
!) which returns a(sender, source)
pair. Using it requires callingsend()
on the sender.Timer
hasTimer::new()
. Using it requires getting a handle from theTimer
itself.
Both Ping
and Channel
have handles that close on drop. Timer
does not.
This all quickly becomes apparent if you have a composite event source that uses multiple kinds of these, and kind of unwieldy at times. For example, if your composite source has both a ping and a channel for internal reasons, you need four fields to use them.
Here is an API we stabilised on that kind of gives the best of both worlds:
/// This event source also allows you to use different event sources to publish
/// messages over the same writeable ZeroMQ socket (usually PUB or PUSH).
/// Messages should be sent over the Calloop MPSC channel sending end. This end
/// can be cloned and used by multiple senders. Common use cases are:
///
/// - One sender: just use `send()`. Useful for any scoket kind except `PULL` or
/// `SUB`.
/// - Multiple senders: clone the sending end of the channel with
/// `clone_sender()`. Useful for `PUSH`, `PUB`, `DEALER`, but probably not at
/// all useful for `REQ`/`REP`.
/// - No senders: take the sending end of the channel with `take_sender()` and
/// drop it. Useful for `SUB` and `PULL`.
pub struct ZeroMQSource<T> {
/// Sending end of channel.
mpsc_sender: Option<calloop::channel::Sender<T>>,
// ...
}
impl<T> ZeroMQSource<T> {
// Send a message via the ZeroMQ socket. If the sending end has been
// taken then this will return an error (as well as for any other error
// condition on a still-existing channel sending end).
pub fn send(&self, msg: T) -> Result<(), std::sync::mpsc::SendError<T>> {
if let Some(sender) = &self.mpsc_sender {
sender.send(msg)
} else {
Err(std::sync::mpsc::SendError(msg))
}
}
// Clone the channel sending end from this ZeroMQ source. Returns [`None`]
// if it's already been removed via [`take()`], otherwise the sender can be
// safely cloned more and sent between threads.
pub fn clone_sender(&self) -> Option<calloop::channel::Sender<T>> {
self.mpsc_sender.clone()
}
// Remove the channel sending end from this ZeroMQ source and return it. If
// you do not want to use it, you can drop it and the receiving end will be
// disposed of too. This is useful for ZeroMQ sockets that are only for
// incoming messages.
pub fn take_sender(&mut self) -> Option<calloop::channel::Sender<T>> {
self.mpsc_sender.take()
}
Disadvantages:
- more complicated
- new API
- extra checks in methods
Advantages:
- if the source are used internally, no extra senders need to be kept in fields, you can just call
self.pinger.ping()
- if you want to close a channel you can just call
self.channel.take_sender()
instead of needing to keep (a) the sender and (b) an option wrapper - reflects API of std type
Option
(send
/clone_sender
/take_sender
) == (map
/clone
/take
) - constructor is more familar
source::Source::new() -> Result<Source>
instead ofsource::make_source() -> Result<(sender, source)>
- can be made backwards compatible easily eg.
make_source()
just becomeslet source = Source::new(); (source.take_sender(), source)
Let me know what you think, and if you're interested I'll code something up for the existing types that have sending handles.