11package com .avast .clients .rabbitmq
22
3- import cats .effect .concurrent .{Deferred , Ref }
4- import cats .syntax .parallel ._
3+ import cats .implicits .catsSyntaxParallelAp
54import com .avast .bytes .Bytes
65import com .avast .clients .rabbitmq .api .{MessageProperties , NotAcknowledgedPublish }
76import com .avast .clients .rabbitmq .logging .ImplicitContextLogger
87import com .avast .clients .rabbitmq .publisher .PublishConfirmsRabbitMQProducer
9- import com .avast .clients .rabbitmq .publisher .PublishConfirmsRabbitMQProducer .SentMessages
108import com .avast .metrics .scalaeffectapi .Monitor
119import com .rabbitmq .client .impl .recovery .AutorecoveringChannel
1210import monix .eval .Task
1311import monix .execution .Scheduler .Implicits .global
12+ import org .junit .runner .manipulation .InvalidOrderingException
1413import org .mockito .Matchers
1514import org .mockito .Matchers .any
1615import org .mockito .Mockito .{times , verify , when }
1716
17+ import scala .concurrent .Await
18+ import scala .concurrent .duration .DurationInt
1819import scala .util .Random
1920
2021class PublisherConfirmsRabbitMQProducerTest extends TestBase {
21- test(" message is acked after one retry" ) {
22+
23+ test(" Message is acked after one retry" ) {
2224 val exchangeName = Random .nextString(10 )
2325 val routingKey = Random .nextString(10 )
2426 val seqNumber = 1L
2527 val seqNumber2 = 2L
2628
2729 val channel = mock[AutorecoveringChannel ]
28- val ref = Ref .of[Task , Map [Long , Deferred [Task , Either [NotAcknowledgedPublish , Unit ]]]](Map .empty).await
29- val updatedState1 = updateMessageState(ref, seqNumber)(Left (NotAcknowledgedPublish (" abcd" , messageId = seqNumber)))
30- val updatedState2 = updateMessageState(ref, seqNumber2)(Right ())
30+ when(channel.getNextPublishSeqNo).thenReturn(seqNumber, seqNumber2)
3131
3232 val producer = new PublishConfirmsRabbitMQProducer [Task , Bytes ](
3333 name = " test" ,
@@ -39,15 +39,21 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
3939 sizeLimitBytes = None ,
4040 blocker = TestBase .testBlocker,
4141 logger = ImplicitContextLogger .createLogger,
42- sentMessages = ref,
4342 sendAttempts = 2
4443 )
45- when(channel.getNextPublishSeqNo).thenReturn(seqNumber, seqNumber2)
4644
47- producer.send(routingKey, Bytes .copyFrom(Array .fill(499 )(32 .toByte))).parProduct(updatedState1.parProduct(updatedState2)).await
45+ val body = Bytes .copyFrom(Array .fill(499 )(32 .toByte))
46+
47+ val publishTask = producer.send(routingKey, body).runToFuture
48+
49+ updateMessageState(producer, seqNumber)(Left (NotAcknowledgedPublish (" abcd" , messageId = seqNumber))).parProduct {
50+ updateMessageState(producer, seqNumber2)(Right ())
51+ }.await
52+
53+ Await .result(publishTask, 10 .seconds)
4854
4955 verify(channel, times(2 ))
50- .basicPublish(Matchers .eq(exchangeName), Matchers .eq(routingKey), any(), Matchers .eq(Bytes .copyFrom( Array .fill( 499 )( 32 .toByte)) .toByteArray))
56+ .basicPublish(Matchers .eq(exchangeName), Matchers .eq(routingKey), any(), Matchers .eq(body .toByteArray))
5157 }
5258
5359 test(" Message not acked returned if number of attempts exhausted" ) {
@@ -56,8 +62,7 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
5662 val seqNumber = 1L
5763
5864 val channel = mock[AutorecoveringChannel ]
59- val ref = Ref .of[Task , Map [Long , Deferred [Task , Either [NotAcknowledgedPublish , Unit ]]]](Map .empty).await
60- val updatedState = updateMessageState(ref, seqNumber)(Left (NotAcknowledgedPublish (" abcd" , messageId = seqNumber)))
65+ when(channel.getNextPublishSeqNo).thenReturn(seqNumber)
6166
6267 val producer = new PublishConfirmsRabbitMQProducer [Task , Bytes ](
6368 name = " test" ,
@@ -69,22 +74,76 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
6974 sizeLimitBytes = None ,
7075 blocker = TestBase .testBlocker,
7176 logger = ImplicitContextLogger .createLogger,
72- sentMessages = ref,
7377 sendAttempts = 1
7478 )
75- when(channel.getNextPublishSeqNo).thenReturn(seqNumber)
79+
80+ val body = Bytes .copyFrom(Array .fill(499 )(32 .toByte))
81+
82+ val publishTask = producer.send(routingKey, body).runToFuture
7683
7784 assertThrows[NotAcknowledgedPublish ] {
78- producer.send(routingKey, Bytes .copyFrom(Array .fill(499 )(32 .toByte))).parProduct(updatedState).await
85+ updateMessageState(producer, seqNumber)(Left (NotAcknowledgedPublish (" abcd" , messageId = seqNumber))).await
86+ Await .result(publishTask, 10 .seconds)
7987 }
8088
81- verify(channel).basicPublish(Matchers .eq(exchangeName), Matchers .eq(routingKey), any(), Matchers .eq(Bytes .copyFrom(Array .fill(499 )(32 .toByte)).toByteArray))
89+ verify(channel).basicPublish(Matchers .eq(exchangeName), Matchers .eq(routingKey), any(), Matchers .eq(body.toByteArray))
90+ }
91+
92+ test(" Multiple messages are fully acked" ) {
93+ val exchangeName = Random .nextString(10 )
94+ val routingKey = Random .nextString(10 )
95+
96+ val channel = mock[AutorecoveringChannel ]
97+
98+ val seqNumbers = 1 to 500
99+ val iterator = seqNumbers.iterator
100+ when(channel.getNextPublishSeqNo).thenAnswer(_ => { iterator.next() })
101+
102+ val producer = new PublishConfirmsRabbitMQProducer [Task , Bytes ](
103+ name = " test" ,
104+ exchangeName = exchangeName,
105+ channel = channel,
106+ monitor = Monitor .noOp(),
107+ defaultProperties = MessageProperties .empty,
108+ reportUnroutable = false ,
109+ sizeLimitBytes = None ,
110+ blocker = TestBase .testBlocker,
111+ logger = ImplicitContextLogger .createLogger,
112+ sendAttempts = 2
113+ )
114+
115+ val body = Bytes .copyFrom(Array .fill(499 )(32 .toByte))
116+
117+ val publishTasks = Task .parSequenceUnordered {
118+ seqNumbers.map { _ =>
119+ producer.send(routingKey, body)
120+ }
121+ }.runToFuture
122+
123+ Task
124+ .parSequenceUnordered(seqNumbers.map { seqNumber =>
125+ updateMessageState(producer, seqNumber)(Right ())
126+ })
127+ .await(15 .seconds)
128+
129+ Await .result(publishTasks, 15 .seconds)
130+
131+ verify(channel, times(seqNumbers.length))
132+ .basicPublish(Matchers .eq(exchangeName), Matchers .eq(routingKey), any(), Matchers .eq(body.toByteArray))
82133 }
83134
84- private def updateMessageState (ref : SentMessages [Task ], messageId : Long )(result : Either [NotAcknowledgedPublish , Unit ]): Task [Unit ] = {
85- ref.get.flatMap(map => map.get(messageId) match {
86- case Some (value) => value.complete(result)
87- case None => updateMessageState(ref, messageId)(result)
88- })
135+ private def updateMessageState (producer : PublishConfirmsRabbitMQProducer [Task , Bytes ], messageId : Long , attempt : Int = 1 )(
136+ result : Either [NotAcknowledgedPublish , Unit ]): Task [Unit ] = {
137+ Task
138+ .delay(producer.confirmationCallbacks.get(messageId))
139+ .flatMap {
140+ case Some (value) => value.complete(result)
141+ case None =>
142+ if (attempt < 90 ) {
143+ Task .sleep(100 .millis) >> updateMessageState(producer, messageId, attempt + 1 )(result)
144+ } else {
145+ throw new InvalidOrderingException (s " The message ID $messageId is not present in the list of callbacks " )
146+ }
147+ }
89148 }
90149}
0 commit comments