Skip to content

Commit 897f522

Browse files
committed
Implement the ask pattern
1 parent 588b54d commit 897f522

File tree

2 files changed

+39
-18
lines changed

2 files changed

+39
-18
lines changed

mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,31 @@ abstract class MqttSession {
5555
*/
5656
def ![A](cp: Command[A]): Unit
5757

58+
/**
59+
* Ask the session to perform a command regardless of the state it is
60+
* in. This is important for sending Publish messages in particular,
61+
* as a connection may not have been established with a session.
62+
* @param cp The command to perform
63+
* @tparam A The type of any carry for the command.
64+
* @return A future indicating when the command has completed. Completion
65+
* is defined as when it has been acknowledged by the recipient
66+
* endpoint.
67+
*/
68+
final def ask[A](cp: Command[A]): Future[A] =
69+
this ? cp
70+
71+
/**
72+
* Ask the session to perform a command regardless of the state it is
73+
* in. This is important for sending Publish messages in particular,
74+
* as a connection may not have been established with a session.
75+
* @param cp The command to perform
76+
* @tparam A The type of any carry for the command.
77+
* @return A future indicating when the command has completed. Completion
78+
* is defined as when it has been acknowledged by the recipient
79+
* endpoint.
80+
*/
81+
def ?[A](cp: Command[A]): Future[A]
82+
5883
/**
5984
* Shutdown the session gracefully
6085
*/
@@ -171,6 +196,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
171196
case c: Command[A] => throw new IllegalStateException(c + " is not a client command that can be sent directly")
172197
}
173198

199+
override def ?[A](cp: Command[A]): Future[A] =
200+
???
201+
174202
override def shutdown(): Unit = {
175203
system.stop(clientConnector.toUntyped)
176204
system.stop(consumerPacketRouter.toUntyped)
@@ -513,6 +541,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
513541
case c: Command[A] => throw new IllegalStateException(c + " is not a server command that can be sent directly")
514542
}
515543

544+
override def ?[A](cp: Command[A]): Future[A] =
545+
???
546+
516547
override def shutdown(): Unit = {
517548
system.stop(serverConnector.toUntyped)
518549
system.stop(consumerPacketRouter.toUntyped)

mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,43 +51,33 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit
5151

5252
val connection = Tcp().outgoingConnection("localhost", 1883)
5353

54-
val mqttFlow: Flow[Command[Promise[Done]], Either[MqttCodec.DecodeError, Event[Promise[Done]]], NotUsed] =
54+
val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
5555
Mqtt
5656
.clientSessionFlow(session, ByteString("1"))
5757
.join(connection)
5858
//#create-streaming-flow
5959

6060
//#run-streaming-flow
61-
val publishDone = Promise[Done]
62-
val (commands: SourceQueueWithComplete[Command[Promise[Done]]], events: Future[Publish]) =
61+
val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
6362
Source
6463
.queue(2, OverflowStrategy.fail)
6564
.via(mqttFlow)
66-
.scan((Option.empty[Publish], false)) {
67-
case ((maybePublish, false), Right(Event(_: PubAck, carry))) =>
68-
carry.foreach(_.success(Done))
69-
(maybePublish, true)
70-
case ((None, publishAcked), Right(Event(p: Publish, _))) =>
71-
(Some(p), publishAcked)
72-
case ((maybePublish, publishAcked), _) =>
73-
(maybePublish, publishAcked)
74-
}
7565
.collect {
76-
case (Some(p: Publish), true) => p
66+
case Right(Event(p: Publish, _)) => p
7767
}
7868
.toMat(Sink.head)(Keep.both)
7969
.run()
8070

8171
commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
8272
commands.offer(Command(Subscribe(topic)))
83-
session ! Command(
84-
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")),
85-
publishDone
86-
)
73+
val publishDone = session ? Command(
74+
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")),
75+
Done
76+
)
8777

8878
//#run-streaming-flow
8979

90-
publishDone.future.futureValue shouldBe Done
80+
publishDone.futureValue shouldBe Done
9181
events.futureValue match {
9282
case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
9383
case e => fail("Unexpected event: " + e)

0 commit comments

Comments
 (0)