Skip to content

Commit

Permalink
added exponential delay for poisoned message handlers (#175)
Browse files Browse the repository at this point in the history
* added exponential delay for poisoned message handlers
  • Loading branch information
Peppi-Ressler authored Aug 26, 2022
1 parent b88bfe4 commit 4408887
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.avast.clients.rabbitmq

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

class ExponentialDelay(val initialDelay: Duration, val period: Duration, val factor: Double, val maxLength: Duration) {
private val maxMillis = maxLength.toMillis

def getExponentialDelay(attempt: Int): FiniteDuration = {
if (attempt == 0) FiniteDuration(initialDelay._1, initialDelay._2)
else {
val millis = math.min(
maxMillis,
(period.toMillis * math.pow(factor, attempt - 1)).toLong
)
FiniteDuration(millis, TimeUnit.MILLISECONDS)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.avast.clients.rabbitmq

import cats.Applicative
import cats.effect.{Resource, Sync}
import cats.effect.{Resource, Sync, Timer}
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps}
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.PoisonedMessageHandler.{defaultHandlePoisonedMessage, DiscardedTimeHeaderName}
Expand All @@ -19,7 +19,8 @@ sealed trait PoisonedMessageHandler[F[_], A] {
implicit dctx: DeliveryContext): F[DeliveryResult]
}

class LoggingPoisonedMessageHandler[F[_]: Sync, A](maxAttempts: Int) extends PoisonedMessageHandler[F, A] {
class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int, republishDelay: Option[ExponentialDelay])
extends PoisonedMessageHandler[F, A] {
private val logger = ImplicitContextLogger.createLogger[F, LoggingPoisonedMessageHandler[F, A]]

override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
Expand All @@ -28,6 +29,7 @@ class LoggingPoisonedMessageHandler[F[_]: Sync, A](maxAttempts: Int) extends Poi
messageId,
maxAttempts,
logger,
republishDelay,
(d: Delivery[A], _) => defaultHandlePoisonedMessage[F, A](maxAttempts, logger)(d))(result)
}
}
Expand All @@ -37,14 +39,19 @@ class NoOpPoisonedMessageHandler[F[_]: Sync, A] extends PoisonedMessageHandler[F
implicit dctx: DeliveryContext): F[DeliveryResult] = Sync[F].pure(result)
}

class DeadQueuePoisonedMessageHandler[F[_]: Sync, A](maxAttempts: Int)(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int, republishDelay: Option[ExponentialDelay])(
moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
extends PoisonedMessageHandler[F, A] {
private val logger = ImplicitContextLogger.createLogger[F, DeadQueuePoisonedMessageHandler[F, A]]

override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
implicit dctx: DeliveryContext): F[DeliveryResult] = {
PoisonedMessageHandler.handleResult(delivery, messageId, maxAttempts, logger, (d, _) => handlePoisonedMessage(d, messageId, rawBody))(
result)
PoisonedMessageHandler.handleResult(delivery,
messageId,
maxAttempts,
logger,
republishDelay,
(d, _) => handlePoisonedMessage(d, messageId, rawBody))(result)
}

private def handlePoisonedMessage(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(
Expand All @@ -59,9 +66,9 @@ class DeadQueuePoisonedMessageHandler[F[_]: Sync, A](maxAttempts: Int)(moveToDea
}

object DeadQueuePoisonedMessageHandler {
def make[F[_]: Sync, A](c: DeadQueuePoisonedMessageHandling,
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
def make[F[_]: Sync: Timer, A](c: DeadQueuePoisonedMessageHandling,
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
val dqpc = c.deadQueueProducer
val pc = ProducerConfig(
name = dqpc.name,
Expand All @@ -73,7 +80,7 @@ object DeadQueuePoisonedMessageHandler {
)

connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer =>
new DeadQueuePoisonedMessageHandler[F, A](c.maxAttempts)((d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {
new DeadQueuePoisonedMessageHandler[F, A](c.maxAttempts, c.republishDelay)((d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {
val cidStrategy = dctx.correlationId match {
case Some(value) => CorrelationIdStrategy.Fixed(value.value)
case None => CorrelationIdStrategy.RandomNew
Expand All @@ -93,11 +100,12 @@ object PoisonedMessageHandler {
final val RepublishCountHeaderName: String = "X-Republish-Count"
final val DiscardedTimeHeaderName: String = "X-Discarded-Time"

private[rabbitmq] def make[F[_]: Sync, A](config: Option[PoisonedMessageHandlingConfig],
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, PoisonedMessageHandler[F, A]] = {
private[rabbitmq] def make[F[_]: Sync: Timer, A](config: Option[PoisonedMessageHandlingConfig],
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, PoisonedMessageHandler[F, A]] = {
config match {
case Some(LoggingPoisonedMessageHandling(maxAttempts)) => Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts))
case Some(LoggingPoisonedMessageHandling(maxAttempts, republishDelay)) =>
Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, republishDelay))
case Some(c: DeadQueuePoisonedMessageHandling) => DeadQueuePoisonedMessageHandler.make(c, connection, monitor)
case Some(NoOpPoisonedMessageHandling) | None =>
Resource.eval {
Expand All @@ -114,26 +122,30 @@ object PoisonedMessageHandler {
logger.warn(s"Message failures reached the limit $maxAttempts attempts, throwing away: $delivery")
}

private[rabbitmq] def handleResult[F[_]: Sync, A](
private[rabbitmq] def handleResult[F[_]: Sync: Timer, A](
delivery: Delivery[A],
messageId: MessageId,
maxAttempts: Int,
logger: ImplicitContextLogger[F],
republishDelay: Option[ExponentialDelay],
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
r match {
case Republish(isPoisoned, newHeaders) if isPoisoned =>
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, handlePoisonedMessage)
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, republishDelay, handlePoisonedMessage)
case r => Applicative[F].pure(r) // keep other results as they are
}
}

private def adjustDeliveryResult[F[_]: Sync, A](
private def adjustDeliveryResult[F[_]: Sync: Timer, A](
delivery: Delivery[A],
messageId: MessageId,
maxAttempts: Int,
newHeaders: Map[String, AnyRef],
logger: ImplicitContextLogger[F],
republishDelay: Option[ExponentialDelay],
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
import cats.syntax.traverse._

// get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
// but we're giving the programmer chance to programmatically _pretend_ lower attempt number
val attempt = (delivery.properties.headers ++ newHeaders)
Expand All @@ -143,8 +155,12 @@ object PoisonedMessageHandler {

logger.debug(s"Attempt $attempt/$maxAttempts for $messageId") >> {
if (attempt < maxAttempts) {
Applicative[F].pure(
Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef])))
for {
_ <- republishDelay.traverse { d =>
val delay = d.getExponentialDelay(attempt)
logger.debug(s"Will republish the message in $delay") >> Timer[F].sleep(delay)
}
} yield Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))
} else {
val now = Instant.now()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ object RecoveryDelayHandlers {
}
}

case class Exponential(initialDelay: Duration = 5.second,
period: Duration = 5.seconds,
factor: Double = 2.0,
maxLength: Duration = 32.seconds)
extends RecoveryDelayHandler {
private val maxMillis = maxLength.toMillis

case class Exponential(override val initialDelay: Duration = 2.second,
override val period: Duration = 2.seconds,
override val factor: Double = 2.0,
override val maxLength: Duration = 32.seconds)
extends ExponentialDelay(initialDelay, period, factor, maxLength) with RecoveryDelayHandler {
override def getDelay(recoveryAttempts: Int): Long = {
if (recoveryAttempts == 0) initialDelay.toMillis
else {
math.min(
maxMillis,
(period.toMillis * math.pow(factor, recoveryAttempts - 1)).toLong
)
}
getExponentialDelay(recoveryAttempts).toMillis
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ final case class DeadQueueProducerConfig(name: String,
properties: ProducerPropertiesConfig = ProducerPropertiesConfig())

case object NoOpPoisonedMessageHandling extends PoisonedMessageHandlingConfig
final case class LoggingPoisonedMessageHandling(maxAttempts: Int) extends PoisonedMessageHandlingConfig
final case class DeadQueuePoisonedMessageHandling(maxAttempts: Int, deadQueueProducer: DeadQueueProducerConfig)
final case class LoggingPoisonedMessageHandling(maxAttempts: Int, republishDelay: Option[ExponentialDelay] = None) extends PoisonedMessageHandlingConfig
final case class DeadQueuePoisonedMessageHandling(maxAttempts: Int, deadQueueProducer: DeadQueueProducerConfig, republishDelay: Option[ExponentialDelay] = None)
extends PoisonedMessageHandlingConfig

sealed trait AddressResolverType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,5 +466,5 @@ class DefaultRabbitMQConsumerTest extends TestBase {
)(userAction)
}

object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3)
object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,5 @@ class DefaultRabbitMQPullConsumerTest extends TestBase {
new DefaultRabbitMQPullConsumer[Task, A](base, channelOps)
}

class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3)
class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PoisonedMessageHandlerTest extends TestBase {
val movedCount = new AtomicInteger(0)

PoisonedMessageHandler
.handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, (_, _) => {
.handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, None, (_, _) => {
Task.delay { movedCount.incrementAndGet() }
})(Republish(countAsPoisoned = false))
.await
Expand All @@ -35,7 +35,7 @@ class PoisonedMessageHandlerTest extends TestBase {
movedCount.set(0)

PoisonedMessageHandler
.handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, (_, _) => {
.handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, None, (_, _) => {
Task.delay { movedCount.incrementAndGet() }
})(Republish())
.await
Expand All @@ -48,7 +48,7 @@ class PoisonedMessageHandlerTest extends TestBase {
Task.now(Republish())
}

val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5)
val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, None)

val properties = (1 to 4).foldLeft(MessageProperties.empty) {
case (p, _) =>
Expand All @@ -65,6 +65,32 @@ class PoisonedMessageHandlerTest extends TestBase {
assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))
}

test("LoggingPoisonedMessageHandler exponential delay") {
import scala.concurrent.duration._

def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
Task.now(Republish())
}

val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds)))
val timeBeforeExecution = Instant.now()
val properties = (1 to 4).foldLeft(MessageProperties.empty) {
case (p, _) =>
run(handler, readAction, p) match {
case Republish(_, h) => MessageProperties(headers = h)
case _ => MessageProperties.empty
}
}

val now = Instant.now()
assert(now.minusSeconds(7).isAfter(timeBeforeExecution) && now.minusSeconds(8).isBefore(timeBeforeExecution))
// check it increases the header with count
assertResult(MessageProperties(headers = Map(RepublishCountHeaderName -> 4.asInstanceOf[AnyRef])))(properties)

// check it will Reject the message on 5th attempt
assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))
}

test("NoOpPoisonedMessageHandler basic") {
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
Task.now(Republish())
Expand All @@ -91,7 +117,7 @@ class PoisonedMessageHandlerTest extends TestBase {

val movedCount = new AtomicInteger(0)

val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5)({ (_, _, _) =>
val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, None)({ (_, _, _) =>
Task.delay(movedCount.incrementAndGet())
})

Expand Down Expand Up @@ -119,7 +145,7 @@ class PoisonedMessageHandlerTest extends TestBase {

val movedCount = new AtomicInteger(0)

val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3)({ (d, _, _) =>
val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3, None)({ (d, _, _) =>
// test it's there and it can be parsed
assert(Instant.parse(d.properties.headers(DiscardedTimeHeaderName).asInstanceOf[String]).toEpochMilli > 0)

Expand Down Expand Up @@ -147,7 +173,7 @@ class PoisonedMessageHandlerTest extends TestBase {

val movedCount = new AtomicInteger(0)

val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5)({ (_, _, _) =>
val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, None)({ (_, _, _) =>
Task.delay(movedCount.incrementAndGet())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,5 @@ class RepublishStrategyTest extends TestBase {
)(userAction)
}

object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3)
object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class PureconfigImplicits(implicit namingConvention: NamingConvention = CamelCas
implicit val autoBindQueueConfigReader: ConfigReader[AutoBindQueueConfig] = deriveReader
implicit val autoBindExchangeConfigReader: ConfigReader[AutoBindExchangeConfig] = deriveReader
implicit val producerPropertiesConfigReader: ConfigReader[ProducerPropertiesConfig] = deriveReader
implicit val exponentialDelayConfigReader: ConfigReader[ExponentialDelay] = deriveReader

implicit val logLevelReader: ConfigReader[Level] = ConfigReader.stringConfigReader.map(Level.valueOf)
implicit val recoveryDelayHandlerReader: ConfigReader[RecoveryDelayHandler] = RecoveryDelayHandlerReader
Expand Down

0 comments on commit 4408887

Please sign in to comment.