Skip to content

Commit a520565

Browse files
committed
Document and improve PubAck handling
This improves the doc around how to back-pressure the publication of Publish commands to avoid buffer overflow in QoS 1+ cases. Implements the ask pattern for this purpose.
1 parent 3a7f0ae commit a520565

File tree

4 files changed

+71
-11
lines changed

4 files changed

+71
-11
lines changed

mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/javadsl/MqttSession.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package akka.stream.alpakka.mqtt.streaming
66
package javadsl
77

8+
import java.util.concurrent.CompletionStage
9+
810
import akka.NotUsed
911
import akka.actor.ActorSystem
1012
import akka.stream.Materializer
@@ -16,6 +18,8 @@ import akka.stream.alpakka.mqtt.streaming.scaladsl.{
1618
}
1719
import akka.stream.javadsl.Source
1820

21+
import scala.compat.java8.FutureConverters._
22+
1923
/**
2024
* Represents MQTT session state for both clients or servers. Session
2125
* state can survive across connections i.e. their lifetime is
@@ -32,6 +36,18 @@ abstract class MqttSession {
3236
*/
3337
def tell[A](cp: Command[A]): Unit
3438

39+
/**
40+
* Ask the session to perform a command regardless of the state it is
41+
* in. This is important for sending Publish messages in particular,
42+
* as a connection may not have been established with a session.
43+
* @param cp The command to perform
44+
* @tparam A The type of any carry for the command.
45+
* @return A future indicating when the command has completed. Completion
46+
* is defined as when it has been acknowledged by the recipient
47+
* endpoint.
48+
*/
49+
def ask[A](cp: Command[A]): CompletionStage[A]
50+
3551
/**
3652
* Shutdown the session gracefully
3753
*/
@@ -47,6 +63,9 @@ abstract class MqttClientSession extends MqttSession {
4763
override def tell[A](cp: Command[A]): Unit =
4864
underlying ! cp
4965

66+
override def ask[A](cp: Command[A]): CompletionStage[A] =
67+
(underlying ? cp).toJava
68+
5069
override def shutdown(): Unit =
5170
underlying.shutdown()
5271
}
@@ -91,6 +110,9 @@ abstract class MqttServerSession extends MqttSession {
91110
override def tell[A](cp: Command[A]): Unit =
92111
underlying ! cp
93112

113+
override def ask[A](cp: Command[A]): CompletionStage[A] =
114+
(underlying ? cp).toJava
115+
94116
override def shutdown(): Unit =
95117
underlying.shutdown()
96118
}

mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,31 @@ abstract class MqttSession {
5555
*/
5656
def ![A](cp: Command[A]): Unit
5757

58+
/**
59+
* Ask the session to perform a command regardless of the state it is
60+
* in. This is important for sending Publish messages in particular,
61+
* as a connection may not have been established with a session.
62+
* @param cp The command to perform
63+
* @tparam A The type of any carry for the command.
64+
* @return A future indicating when the command has completed. Completion
65+
* is defined as when it has been acknowledged by the recipient
66+
* endpoint.
67+
*/
68+
final def ask[A](cp: Command[A]): Future[A] =
69+
this ? cp
70+
71+
/**
72+
* Ask the session to perform a command regardless of the state it is
73+
* in. This is important for sending Publish messages in particular,
74+
* as a connection may not have been established with a session.
75+
* @param cp The command to perform
76+
* @tparam A The type of any carry for the command.
77+
* @return A future indicating when the command has completed. Completion
78+
* is defined as when it has been acknowledged by the recipient
79+
* endpoint.
80+
*/
81+
def ?[A](cp: Command[A]): Future[A]
82+
5883
/**
5984
* Shutdown the session gracefully
6085
*/
@@ -171,6 +196,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
171196
case c: Command[A] => throw new IllegalStateException(c + " is not a client command that can be sent directly")
172197
}
173198

199+
override def ?[A](cp: Command[A]): Future[A] =
200+
???
201+
174202
override def shutdown(): Unit = {
175203
system.stop(clientConnector.toClassic)
176204
system.stop(consumerPacketRouter.toClassic)
@@ -513,6 +541,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
513541
case c: Command[A] => throw new IllegalStateException(c + " is not a server command that can be sent directly")
514542
}
515543

544+
override def ?[A](cp: Command[A]): Future[A] =
545+
???
546+
516547
override def shutdown(): Unit = {
517548
system.stop(serverConnector.toClassic)
518549
system.stop(consumerPacketRouter.toClassic)

mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,20 @@ public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
129129
SourceQueueWithComplete<Command<Object>> commands = run.first();
130130
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
131131
commands.offer(new Command<>(new Subscribe(topic)));
132-
session.tell(
133-
new Command<>(
134-
new Publish(
135-
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
136-
topic,
137-
ByteString.fromString("ohi"))));
132+
CompletionStage<Done> publishDone =
133+
session.ask(
134+
new Command<>(
135+
new Publish(
136+
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
137+
topic,
138+
ByteString.fromString("ohi")),
139+
Done.getInstance()));
138140
// #run-streaming-flow
139141

140-
CompletionStage<Publish> event = run.second();
141-
Publish publishEvent = event.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
142+
publishDone.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
143+
144+
CompletionStage<Publish> events = run.second();
145+
Publish publishEvent = events.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
142146
assertEquals(publishEvent.topicName(), topic);
143147
assertEquals(publishEvent.payload(), ByteString.fromString("ohi"));
144148

mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit
7070

7171
commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
7272
commands.offer(Command(Subscribe(topic)))
73-
session ! Command(
74-
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
75-
)
73+
val publishDone = session ? Command(
74+
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")),
75+
Done
76+
)
77+
7678
//#run-streaming-flow
7779

80+
publishDone.futureValue shouldBe Done
7881
events.futureValue match {
7982
case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
8083
case e => fail("Unexpected event: " + e)

0 commit comments

Comments
 (0)