Skip to content

[RFC] Add support for C++26 std::execution #655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SidneyCogdill
Copy link

@SidneyCogdill SidneyCogdill commented Apr 16, 2025

Alternative to #653.

Note:

  • This implementation is thread safe, but multipart message operations are not. I've experimented with multiple designs (separate send/receive socket pair; ring buffer queue; socket-wide critical section; functional style interface) in my local development project, in an attempt to fix/mitigate the hazard, but the end conclusion is basically don't do anything (as is currently implemented in this PR), as the mitigations/workarounds come with trade-offs that make them largely not viable to be used in real world scenarios, and the "don't do anything" approach significantly simplifies the implementation.
  • Some comments are outdated (partly related to the experiments)
  • event loop depends on Boost.Asio.
  • Currently depends on stdexec for std::execution implementation. most of the primitives being used will be available in C++26.
  • CPM is added.

@gummif
Copy link
Member

gummif commented May 3, 2025

Thanks. I think some of these can be separate PRs, like the CPM and vcpkg changes.

I have never used senders so it's a bit hard for me to review. But about the thread-safety: can multiple tasks send/receive messages/multipart messages at the same time (executing in a single threaded context)?

@SidneyCogdill
Copy link
Author

can multiple tasks send/receive messages/multipart messages at the same time (executing in a single threaded context)

No. because multiple wake up (from the stream_descriptor wait_read / wait_write) are scheduled at the same time.

For example if both A and B are sending multipart from the same ROUTER socket (where the first message is the routing_id). There are two pending multipart messages to be sent:

  • message 1 from A: ["0x0001" (SNDMORE), "Hello" (SNDMORE), "World"]
  • message 2 from B: ["0x0002" (SNDMORE), "Sent" (SNDMORE), "From" (SNDMORE), "ZeroMQ"]

Assuming A queued "Hello" (SNDMORE) into the send queue and the queue is full, A will be suspended and wait for file descriptor's ready write signal.
On signal come in, both A and B will be notified to continue and it's not a guarantee if "0x0002" (SNDMORE) or "World" will be submitted into the queue.

This should be rare, but still possible (for example if the server is sending messages too fast, faster than the clients can handle).

I think the comment can be updated to use term "concurrent unsafe" rather than "thread unsafe".

I looked at the native Rust implementation (which has async support) and it also has the same restriction at the exposed interfaces. read/write interfaces take mutable references which basically means it's not allowed to read/write the same socket concurrently (as Rust only allows single mutable reference to exist at a time).


Other than the "don't do anything" approach. There are two sane ways to address it:

  • Use async_mutex: https://github.com/facebookexperimental/libunifex/blob/main/include/unifex/async_mutex.hpp (this implementation is from libunifex which uses a slightly outdated syntax of P2300, but it's not hard to adapt it to work with C++26/stdexec). This internally uses a linked list (it stores the next continuation in the current waiter). Note that although the library is marked as experimental, the technique isn't new, the implementation is similar to the coro Mutex in Folly: source
  • Use ring buffer (fixed size wait free queue) + linked list, similar to io_uring in Linux kernel. this offers better performance than the last one, but also harder to implement correctly (io_uring is known to be much less reliable than the good old epoll and had caused issues in major event loop frameworks, e.g. libuv).

Both will slow down all read/write operations, as more synchronization efforts are required (single message send also needs to not intervene with multipart message send).


I think some of these can be separate PRs, like the CPM and vcpkg changes.

I agree. I will remove them later.

By the way the CI is failing as it doesn't have Boost library. I'm still hesitant if Boost.Asio should be used here (hence the PR was marked as draft, and the interface is placed inside v1 namespace). Currently cppzmq is a header only library with no external dependencies other than libzmq. I'm really just pushing the implementation I use personally (hopefully it would be useful), but not everyone likes Boost or Asio.

On the other hand, the alternate approach would be to either ask libzmq to expose asynchronous interfaces as C API like nng does (API ref), or have me implement my own in-house event loop (which I sadly can't do better than the existing, battle-tested Asio), or not have coroutine/async support in cppzmq at all (which I think is also bad).

@SidneyCogdill
Copy link
Author

SidneyCogdill commented May 3, 2025

wishlist (from libzmq): provide asynchronous APIs that allow me to:

  • schedule a multipart send/receive operation [1], accepting a callback that is triggered on completion.
  • cancel the pending operation.

If these are possible, then cppzmq can gain async/coroutine support using only libzmq. the entire abstraction of coroutines are built on top of callbacks anyway.


[1] A multipart message containing only a single message is also a multipart message.

*I'm basically asking for Windows IOCP style APIs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants