Skip to content

feat: return token when request is made, resolve token when request handling completes #916

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Sep 29, 2024

Solves #805

Type of change

New feature, that notifies the requester when their requests are acknowledged by the broker(in the case of QoS 1/2 Publishes and Subscribe/Unsubscribe) or when they are written to TCP buffer(for all other requests).

BREAKING: pending definition changes from VecDeque<Request> to VecDeque<(Request, Option<PromiseTx>)>.

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

@de-sh de-sh changed the title feat: acknowledge notification feat: notify when publish/subscribe/unsubscribe packets are acked Sep 29, 2024
@coveralls
Copy link

coveralls commented Sep 29, 2024

Pull Request Test Coverage Report for Build 13582365646

Details

  • 322 of 696 (46.26%) changed or added relevant lines in 7 files are covered.
  • 34 unchanged lines in 4 files lost coverage.
  • Overall coverage increased (+1.7%) to 37.769%

Changes Missing Coverage Covered Lines Changed/Added Lines %
rumqttc/src/eventloop.rs 1 2 50.0%
rumqttc/src/v5/eventloop.rs 0 1 0.0%
rumqttc/src/tokens.rs 21 33 63.64%
rumqttc/src/state.rs 147 177 83.05%
rumqttc/src/v5/state.rs 120 166 72.29%
rumqttc/src/client.rs 28 149 18.79%
rumqttc/src/v5/client.rs 5 168 2.98%
Files with Coverage Reduction New Missed Lines %
rumqttc/src/state.rs 1 84.42%
rumqttc/src/v5/state.rs 3 67.23%
rumqttc/src/client.rs 15 32.47%
rumqttc/src/v5/client.rs 15 12.64%
Totals Coverage Status
Change from base Build 13581587485: 1.7%
Covered Lines: 6363
Relevant Lines: 16847

💛 - Coveralls

@de-sh de-sh marked this pull request as ready for review September 29, 2024 20:08
@xiaocq2001
Copy link
Contributor

xiaocq2001 commented Sep 30, 2024

Glad to see the progress here. Some of the things to discuss:

  1. Is there any example on how the feature is used?
    I tried to use following code to test
    println!("--- Publishing messages and wait for ack ---");
    let mut set = JoinSet::new();

    let ack_promise = client
        .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    while let Some(res) = set.join_next().await {
        println!("Acknoledged = {:?}", res?);
    }

The output shows "RecvError"

--- Publishing messages and wait for ack ---
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 0, Payload Size = 1
Event = Outgoing(Publish(0))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 2
Event = Outgoing(Publish(4))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 3
Event = Outgoing(Publish(5))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Event = Incoming(Publish(Publish { dup: false, qos: AtMostOnce, retain: false, topic: b"hello/world", pkid: 0, payload: b"\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
Event = Incoming(PubRec(PubRec { pkid: 5, reason: Success, properties: None }))
Event = Outgoing(PubRel(5))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01\x01", properties: None }))
Event = Outgoing(PubAck(1))
Event = Incoming(Publish(Publish { dup: false, qos: ExactlyOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01\x01", properties: None }))
Event = Outgoing(PubRec(2))
Event = Incoming(PubComp(PubComp { pkid: 5, reason: Success, properties: None }))
Event = Incoming(PubRel(PubRel { pkid: 2, reason: Success, properties: None }))
Event = Outgoing(PubComp(2))

In outgoing_publish, the tx is not saved to ack_waiter, it's dropped!
Maybe you can add (with QoS0 notification in discuss 2)

        if publish.qos != QoS::AtMostOnce {
            self.ack_waiter[pkid as usize] = tx;
        } else {
            if let Some(tx) = tx {
                tx.resolve();
            }
        }

after

        let event = Event::Outgoing(Outgoing::Publish(pkid));
        self.events.push_back(event);
  1. It seems in QoS0, there is no notification, is it worthy that we have notification on QoS0 packet sent (outgoing_publish).

@de-sh
Copy link
Contributor Author

de-sh commented Sep 30, 2024

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

@@ -75,11 +75,11 @@ pub struct EventLoop {
/// Current state of the connection
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
requests_rx: Receiver<PendingRequest>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in #917

/// Pending packets from last session
pub pending: VecDeque<Request>,
pub pending: VecDeque<(Request, Option<PromiseTx>)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, note on release

@de-sh de-sh mentioned this pull request Oct 1, 2024
2 tasks
@de-sh de-sh changed the title feat: notify when publish/subscribe/unsubscribe packets are acked feat: return token when request is made, resolve token when request handling completes Nov 15, 2024
@izolyomi
Copy link

Thank you for the efforts, I'm also interested in using this once it's merged.

@Leandros
Copy link

Leandros commented Dec 2, 2024

Any way that one can help to get this merged?

Thank you!

@swanandx
Copy link
Contributor

swanandx commented Dec 3, 2024

just an heads up: this will be merged and released with rumqttc upcoming version by / in January!

thank you so much for your patience and contribution 💯

@izolyomi
Copy link

Do you maybe have any updates like a current schedule or planned deadline for a merge and release of this feature? Thank you for the efforts.

@tekjar
Copy link
Contributor

tekjar commented Jan 31, 2025

Hi @izolyomi . We are unfortunately delayed on this. We'll try to focus on these issues in Feb and cut a new release

@cartertinney
Copy link

I checked this out and it works quite well, aside from a handful of issues:

  1. The reason for rejection by the broker that are reported via the Token cannot seem to be matched upon, or interacted with in any meaningful way. It would be nice if I could write conditional logic the respond to particular rejections in different ways. Perhaps I missed something?

  2. If a subscribe or unsubscribe is done when the client thinks it is connected, but is not and has just not timed out on the ping/keep alive yet, the Token will hang indefinitely - I am assuming is because, per MQTT spec, unacknowledged SUB and UNSUB are discarded upon connection loss/reconnect, but the discard does not seem to get reported to the Token, resulting in the token.await hanging forever.

  3. When using QoS2 and manual ack for a received message (perhaps this also applies with auto ack), I'm seeing the Token.await return after the PUBREC is sent, without waiting for the rest of the PUBREL/PUBCOMP handshake

  4. When using QoS1 or QoS2 and manual ack for a received message if there is a disconnect between receiving the message and sending the manual ack with .ack(), the PUBACK will queue up and be redelivered upon reconnect, which is a MQTT spec violation as it can lead to acking the wrong message, or duplicate acks in edge cases, or even broker disconnects. I assume this also applies to automatic acking, but it's very hard to reproduce this condition without manual ack.

I see that the release of this feature has been delayed - is there some issue with the implementation that I'm missing? Aside from the above issues I found, everything worked fantastic, and it would be extremely helpful to get this integrated into my codebase ASAP. Are there any updates?

@de-sh
Copy link
Contributor Author

de-sh commented Feb 21, 2025

checked this out and it works quite well, aside from a handful of issues:

Thanks for pointing these out @cartertinney, will figure out how to solve this over the weekend!

@de-sh de-sh mentioned this pull request Feb 22, 2025
2 tasks
@cartertinney
Copy link

cartertinney commented Feb 25, 2025

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.

To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

@swanandx
Copy link
Contributor

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.

To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

for 2, if client is timed out on keep alive, the connection is closed and so token should resolve to Disconnected error! you are mentioning that this isn't working right?

for 3, we tested and it was working fine.

note that when using manual acks with qos 2 you just need to ack the incoming publish ( rest of the qos 2 flow is handled internally, i.e. pubrel/pubcomp hadshake is done internally ) and for outgoing publishes, token is resolved only after getting pubcomp!

[ shall we have manual acks for PubRec &/ PubRel as well? cc: @tekjar @de-sh ]

can you please provide a minimal poc so that we can reproduce the issue to fix it?

thanks!

@cartertinney
Copy link

cartertinney commented Feb 26, 2025

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.
To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

for 2, if client is timed out on keep alive, the connection is closed and so token should resolve to Disconnected error! you are mentioning that this isn't working right?

for 3, we tested and it was working fine.

note that when using manual acks with qos 2 you just need to ack the incoming publish ( rest of the qos 2 flow is handled internally, i.e. pubrel/pubcomp hadshake is done internally ) and for outgoing publishes, token is resolved only after getting pubcomp!

[ shall we have manual acks for PubRec &/ PubRel as well? cc: @tekjar @de-sh ]

can you please provide a minimal poc so that we can reproduce the issue to fix it?

thanks!

Yes, like I said, issues 1 and 3 appear to be solved. I'm happy with the way QoS2 handles the PUBREC/REL/COMP handshake, it is much improved from prior iterations - I personally don't think manual PUBCOMP has much value from a user perspective, I like the current solution.

As for 2, I can run the test again, perhaps I missed something, but you are correct - I'm not seeing the Token resolve to a Disconnected error, I'm seeing the same hanging behavior as before on Tokens created by SUB and UNSUB operations. Is it possible there's something I'm missing?

@swanandx
Copy link
Contributor

I'm not seeing the Token resolve to a Disconnected error, I'm seeing the same hanging behavior as before on Tokens created by SUB and UNSUB operations

my observations:
TokenError::Disconnected is returned when the sender ( say tx ) is dropped. At a given time tx can be at:

  • in channel between client and eventloop
    this is when we call the method ( publish/subscribe/etc ) and eventloop is yet to process that request.

  • in state of eventloop
    one that Request is processed and we are waiting for the Ack

Issue is that the tx is never dropped if we don't have network connection!

Minimal PoC
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mqttoptions = MqttOptions::new("poc", "never-connect-here", 1883);

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

    // spawn eventloop poll in separate task
    tokio::task::spawn(async move {
        loop {
            let event = eventloop.poll().await;
            match &event {
                Ok(v) => {
                    println!("Event = {v:?}");
                }
                Err(e) => {
                    println!("Error = {e:?}");
                    tokio::time::sleep(Duration::from_secs(60)).await;
                }
            }
        }
    });

    let token = client
        .subscribe("hello/world", QoS::AtMostOnce)
        .await
        .unwrap();

    // as we never connect to network, we never process any requests in eventloop
    // that are pending in channel. Hence sender is never dropped and token is
    // never resolved!
    match token.await {
        Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
        Err(e) => println!("Subscription failed: {e:?}"),
    };
}

For keep alive timeouts / disconnects due to other errors:

packets that are sent, i.e. for handles in state, we should clear them em, pushed the commit c7ec9b4 for fixing this!

But for the ones that are in channel ( yet to reach the eventloop ). we don't intend to drop the requests which are in channel, which leads to that hanging behaviour as in above poc and mentioned above.

please comment on the above behaviour for suggestions!

thanks!

@swanandx
Copy link
Contributor

swanandx commented Feb 28, 2025

  1. When using QoS1 or QoS2 and manual ack for a received message if there is a disconnect between receiving the message and sending the manual ack with .ack(), the PUBACK will queue up and be redelivered upon reconnect, which is a MQTT spec violation as it can lead to acking the wrong message, or duplicate acks in edge cases, or even broker disconnects. I assume this also applies to automatic acking, but it's very hard to reproduce this condition without manual ack.

we do not retransmit the pubacks though:

Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack

can you verify again if i'm missing something or maybe something related to qos2? @cartertinney

@cartertinney
Copy link

cartertinney commented Mar 4, 2025

@de-sh @swanandx

Trying out the latest version, I now see the expected behavior for SUB and UNSUB in the case of connection drops, I now get a TokenError indicating the "sender side of channel was dropped". This is the right behavior, although the error text is a little confusing (the connection was lost, the underlying channel or Event Loop wasn't dropped from memory and still exists, and if connection were to be regained things would again begin working normally). That quirk of semantics aside, this is a huge improvement for correctness, I am thrilled with this change.

Now, regarding the acknowledgement process:

I understand that the ACK requests that queue up while disconnected are in the channel (yet to reach the EventLoop/ MQTT session), but the issue is that allowing them to queue up like this violates the MQTT specification. Acking a message from a previous connection upon reconnect is dangerous because we can't guarantee we are acknowledging the packet we think we are. Consider the case:

A publish is sent to the client from the broker. After receiving the publish, but before sending the PUBACK, the client loses connection. The PUBACK is queued up in the channel. Eventually, the client reconnects. The broker redelivers the original publish, now with the DUP flag set to true. The queued up PUBACK the client had in the channel is also delivered upon reconnect. This PUBACK was intended to be sent for the original copy of the message on a prior connection, but now acks the redelivery from the broker's perspective. The broker sees no outstanding PKID that needs to be acked, but the client thinks it still needs to ack the duplicate (treated as a new message per spec), resulting in a second ACK being sent for the same PKID. At this point:

  1. The broker may choose to end the session with the client for sending an unexpected ACK (in my experience this is most common on QoS2 with unsolicited PUBREC/PUBREL errors in the broker/client depending on the timing) because of protocol violations that threaten the integrity of message delivery guarantees.
  2. If it does not kick the client off, and the broker had reassigned the PKID to a new message (because the PKID was acked means it's available for reassignment), and delivers it to the client before the client sends the second ACK for the duplicate message, the client could incorrectly ack a completely different message, resulting in QoS guarantees falling apart.

This is admittedly a fairly unlikely scenario when using QoS1 and automatic acking, but when using a high amount of message throughput, or when using QoS2, or when using manual ack (and ESPECIALLY when using QoS2 with manual ack and high throughput), this scenario actually becomes relatively easy to trigger.

The only correct behavior is that you must wait for the broker to redeliver the publish, and then ack the duplicate.

I realize, from an implementation standpoint, that this is difficult due to the way you use a single channel for all requests, but ultimately to be correct I think you probably would need to handle PUBACK/PUBREC/PUBCOMP differently from the rest.

I hope this explanation is clear - it's a relatively easy scenario to reproduce, but I'm happy to provide a sample application that will reliably create this issue if necessary.

@swanandx
Copy link
Contributor

swanandx commented Mar 5, 2025

fair points, but this case is already handled ( atleast for PubAck ) here as mentioned previously:

Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack

iirc, we need to retransmit PubRels as well right? so here as you pointed out ignoring PubRec and PubComps would be much more better behaviour right?

@cartertinney
Copy link

cartertinney commented Mar 25, 2025

@swanandx My apologies for the delay in response. Got caught up in some other items.

Even on QoS1 when I invoke .ack() while disconnected, I see the Outgoing(PubAck) appear on the EventLoop after reconnect. Furthermore, the AckOfAck I get back after the reconnect resolves to AckOfAck::None which I believe should indicate successful send of the PubAck for QoS 1.

So, unless I'm missing something, I believe my previous point stands - I'm seeing PubAcks queue up while offline, although the problem is significantly worse on QoS2.

The repro for this is rather simple:

  • Create a client that receives messages using manual ack on QoS1 with clean_start = false.
  • After receiving a message, but before invoking .ack() drop the network
  • After the EventLoop has detected the disconnect, allow the invocation of .ack() on the message previously received.
  • Reconnect the network
  • EventLoop will reconnect to the broker
  • You will see the Outgoing(PubAck) corresponding to the PubAck issued while disconnected on the EventLoop in the logs
  • The await of the Token<AckOfAck> that was returned by the invocation to .ack() will resolve to Ok(AckOfAck::None) indicating QoS1 completion.

Unless I'm missing something, this means that a PubAck for a publish from a previous connection was queued offline and delivered upon reconnect, which is invalid.

@swanandx
Copy link
Contributor

swanandx commented Apr 2, 2025

hey @cartertinney , thanks for your patience.

PubAck for a publish from a previous connection was queued offline and delivered upon reconnect, which is invalid.

we do the cleanup when network is dropped ( or there is some other error ).

once that cleanup is done, anything pushed to channel between client and eventloop don't have anything to do with old session.

In case you mentioned above, you are calling .ack() after the cleanup is done, so it gets buffered in channel and sent when we are able to. Just like callling .publish(), if you call .publish() it will put that request in channel and send in on new session.

I believe this should be users responsibility to call .ack() only when it is supposed to be called.

@cartertinney
Copy link

@swanandx

I see where you're coming from, and if that's your API philosophy, I do recognize it as consistent with other choices you've made, but I think that in practice expecting the user to behave correctly is a bit impractical given that the connection state information is only known by the event loop polling thread, wheras the part of a hypothetical application that's doing manual acking, or publishing, or anything else is... well, in a different thread. You're basically forcing every application that does manual acking to build constructs to track and share data regarding connection state, which is doable, but seems a bit of an imposition on application design when it would seemingly be easier to implement internally.

Similar things could be said of manual ack ordering though, and again, I do recognize that you making ideologically consistent API design choices, so at least there is consistency.

Let me try implementing such a scheme to prevent inappropriate calls to .ack() and see how much work is required to do it.

@swanandx
Copy link
Contributor

Let me try implementing such a scheme to prevent inappropriate calls to .ack() and see how much work is required to do it.

cool, thanks!

You're basically forcing every application that does manual acking to build constructs to track and share data regarding connection state

Only if that is required by application. Generally you won't need to build any construct, it can be achieved by setting the capacity of channel to 0 ( we do it in uplink as well )

Having it set to 0 ensures that packets don't get accepted when network is down ( because of how poll works ). Then in your application you can opt to block on publish, subscribe, ack etc ( or do it without blocking by try_publish, try_subscribe, try_ack etc. ).

That is the behavior you are expecting right?

Thanks for understanding my pov and for the feedback!

@izolyomi
Copy link

Hi @izolyomi . We are unfortunately delayed on this. We'll try to focus on these issues in Feb and cut a new release

Does anyone have an updated rough estimated time of arrival for a next release including this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants