Skip to content

Commit

Permalink
Merge pull request #45 from avast/StreamingConsumerTimeouts
Browse files Browse the repository at this point in the history
Streaming consumer improvements - timeouts and poisoned message handler
  • Loading branch information
jendakol authored Feb 13, 2020
2 parents 01d407e + bb73511 commit da12332
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,89 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

}

object Consumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](
consumerConfig: ConsumerConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor,
consumerListener: ConsumerListener,
readAction: DeliveryReadAction[F, A])(implicit timer: Timer[F], cs: ContextShift[F]): DefaultRabbitMQConsumer[F] = {

prepareConsumer(consumerConfig, readAction, connectionInfo, republishStrategy, channel, consumerListener, blocker, monitor)
}
}

object PullConsumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](
consumerConfig: PullConsumerConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor)(implicit cs: ContextShift[F]): DefaultRabbitMQPullConsumer[F, A] = {

preparePullConsumer(consumerConfig, connectionInfo, republishStrategy, channel, blocker, monitor)
}
}

object StreamingConsumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](consumerConfig: StreamingConsumerConfig,
channel: ServerChannel,
newChannel: F[ServerChannel],
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor,
consumerListener: ConsumerListener)(
implicit timer: Timer[F],
cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = {

prepareStreamingConsumer(consumerConfig, connectionInfo, republishStrategy, channel, newChannel, consumerListener, blocker, monitor)
}
}

object Declarations {
def declareExchange[F[_]: Sync](config: DeclareExchangeConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.this.declareExchange(name, `type`, durable, autoDelete, arguments, channel, connectionInfo)
}

def declareQueue[F[_]: Sync](config: DeclareQueueConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.this.declareQueue(channel, name, durable, exclusive, autoDelete, arguments)
()
}

def bindQueue[F[_]: Sync](config: BindQueueConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.bindQueue(channel, queueName, exchangeName, routingKeys, arguments, connectionInfo)
}

def bindExchange[F[_]: Sync](config: BindExchangeConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

routingKeys.foreach {
DefaultRabbitMQClientFactory.this
.bindExchange(connectionInfo)(channel, sourceExchangeName, destExchangeName, arguments.value)
}
}
}

private def prepareStreamingConsumer[F[_]: ConcurrentEffect, A: DeliveryConverter](
consumerConfig: StreamingConsumerConfig,
connectionInfo: RabbitMQConnectionInfo,
Expand All @@ -48,6 +131,8 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
monitor: Monitor)(implicit timer: Timer[F], cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = {
import consumerConfig._

val timeoutsMeter = monitor.meter("timeouts")

// auto declare exchanges
declareExchangesFromBindings(connectionInfo, channel, consumerConfig.bindings)

Expand All @@ -61,7 +146,9 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

bindQueueForRepublishing(connectionInfo, channel, consumerConfig.queueName, republishStrategy)

DefaultRabbitMQStreamingConsumer(
val timeoutAction = (e: TimeoutException) => doTimeoutAction(name, consumerConfig.timeoutAction, timeoutLogLevel, timeoutsMeter, e)

DefaultRabbitMQStreamingConsumer.make(
name,
newChannel.flatTap(ch => Sync[F].delay(ch.basicQos(consumerConfig.prefetchCount))),
consumerTag,
Expand All @@ -71,6 +158,8 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
queueBufferSize,
monitor,
republishStrategy.toRepublishStrategy,
processTimeout,
timeoutAction,
blocker
)
}
Expand Down Expand Up @@ -305,36 +394,6 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments.value)
}

object Consumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](
consumerConfig: ConsumerConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor,
consumerListener: ConsumerListener,
readAction: DeliveryReadAction[F, A])(implicit timer: Timer[F], cs: ContextShift[F]): DefaultRabbitMQConsumer[F] = {

prepareConsumer(consumerConfig, readAction, connectionInfo, republishStrategy, channel, consumerListener, blocker, monitor)
}
}

object PullConsumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](
consumerConfig: PullConsumerConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor)(implicit cs: ContextShift[F]): DefaultRabbitMQPullConsumer[F, A] = {

preparePullConsumer(consumerConfig, connectionInfo, republishStrategy, channel, blocker, monitor)
}
}

private[rabbitmq] def bindQueue(connectionInfo: RabbitMQConnectionInfo)(
channel: ServerChannel,
queueName: String)(exchangeName: String, routingKey: String, arguments: ArgumentsMap): AMQP.Queue.BindOk = {
Expand Down Expand Up @@ -368,42 +427,6 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}
}

object Declarations {
def declareExchange[F[_]: Sync](config: DeclareExchangeConfig,
channel: ServerChannel,
connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.this.declareExchange(name, `type`, durable, autoDelete, arguments, channel, connectionInfo)
}

def declareQueue[F[_]: Sync](config: DeclareQueueConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.this.declareQueue(channel, name, durable, exclusive, autoDelete, arguments)
()
}

def bindQueue[F[_]: Sync](config: BindQueueConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

DefaultRabbitMQClientFactory.bindQueue(channel, queueName, exchangeName, routingKeys, arguments, connectionInfo)
}

def bindExchange[F[_]: Sync](config: BindExchangeConfig, channel: ServerChannel, connectionInfo: RabbitMQConnectionInfo): F[Unit] =
Sync[F].delay {
import config._

routingKeys.foreach {
DefaultRabbitMQClientFactory.this
.bindExchange(connectionInfo)(channel, sourceExchangeName, destExchangeName, arguments.value)
}
}
}

private[rabbitmq] def convertDelivery[F[_]: ConcurrentEffect, A: DeliveryConverter](d: Delivery[Bytes]): Delivery[A] = {
d.flatMap { d =>
implicitly[DeliveryConverter[A]].convert(d.body) match {
Expand Down Expand Up @@ -433,7 +456,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
Concurrent
.timeout(action, processTimeout)
.recoverWith {
case e: TimeoutException => doTimeoutAction(consumerConfig, timeoutsMeter, e)
case e: TimeoutException => doTimeoutAction(name, timeoutAction, timeoutLogLevel, timeoutsMeter, e)
}
} else action
}
Expand All @@ -453,14 +476,15 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}
}

private def doTimeoutAction[A, F[_]: ConcurrentEffect](consumerConfig: ConsumerConfig,
private def doTimeoutAction[A, F[_]: ConcurrentEffect](consumerName: String,
timeoutAction: DeliveryResult,
timeoutLogLevel: Level,
timeoutsMeter: Meter,
e: TimeoutException): F[DeliveryResult] = Sync[F].delay {
import consumerConfig._

timeoutsMeter.mark()

lazy val msg = s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}"
lazy val msg = s"[$consumerName] Task timed-out, applying DeliveryResult.$timeoutAction"

timeoutLogLevel match {
case Level.ERROR => logger.error(msg, e)
Expand All @@ -470,24 +494,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
case Level.TRACE => logger.trace(msg, e)
}

consumerConfig.timeoutAction
}

object StreamingConsumer {

def create[F[_]: ConcurrentEffect, A: DeliveryConverter](consumerConfig: StreamingConsumerConfig,
channel: ServerChannel,
newChannel: F[ServerChannel],
connectionInfo: RabbitMQConnectionInfo,
republishStrategy: RepublishStrategyConfig,
blocker: Blocker,
monitor: Monitor,
consumerListener: ConsumerListener)(
implicit timer: Timer[F],
cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = {

prepareStreamingConsumer(consumerConfig, connectionInfo, republishStrategy, channel, newChannel, consumerListener, blocker, monitor)
}
timeoutAction
}

private implicit def argsAsJava(value: ArgumentsMap): java.util.Map[String, Object] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.avast.clients.rabbitmq

import java.util.concurrent.TimeoutException

import cats.effect.concurrent._
import cats.effect.{Blocker, CancelToken, ConcurrentEffect, ContextShift, Effect, ExitCase, IO, Resource, Sync}
import cats.effect.{Blocker, CancelToken, Concurrent, ConcurrentEffect, ContextShift, Effect, ExitCase, IO, Resource, Sync, Timer}
import cats.syntax.all._
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer.RepublishOriginalRoutingKeyHeaderName
Expand All @@ -13,17 +15,20 @@ import com.typesafe.scalalogging.StrictLogging
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import scala.util.control.NonFatal

class DefaultRabbitMQStreamingConsumer[F[_]: ConcurrentEffect, A: DeliveryConverter] private (
class DefaultRabbitMQStreamingConsumer[F[_]: ConcurrentEffect: Timer, A: DeliveryConverter] private (
name: String,
queueName: String,
initialConsumerTag: String,
connectionInfo: RabbitMQConnectionInfo,
consumerListener: ConsumerListener,
monitor: Monitor,
republishStrategy: RepublishStrategy,
timeout: FiniteDuration,
timeoutAction: TimeoutException => F[DeliveryResult],
blocker: Blocker)(createQueue: F[DeliveryQueue[F, Bytes]], newChannel: F[ServerChannel])(implicit cs: ContextShift[F])
extends RabbitMQStreamingConsumer[F, A]
with StrictLogging {
Expand Down Expand Up @@ -179,7 +184,7 @@ class DefaultRabbitMQStreamingConsumer[F[_]: ConcurrentEffect, A: DeliveryConver
consumerOpt <- this.consumer.get
consumer = consumerOpt.getOrElse(throw new IllegalStateException("Consumer has to be initialized at this stage! It's probably a BUG"))
_ <- consumer.queue.enqueue1((delivery, deferred))
res <- deferred.get
res <- Concurrent.timeout(deferred.get, timeout).recoverWith { case e: TimeoutException => timeoutAction(e) }
} yield {
res
}
Expand Down Expand Up @@ -262,7 +267,7 @@ object DefaultRabbitMQStreamingConsumer extends StrictLogging {

private type DeliveryQueue[F[_], A] = Queue[F, (Delivery[A], Deferred[F, DeliveryResult])]

def apply[F[_]: ConcurrentEffect, A: DeliveryConverter](
def make[F[_]: ConcurrentEffect: Timer, A: DeliveryConverter](
name: String,
newChannel: F[ServerChannel],
initialConsumerTag: String,
Expand All @@ -272,6 +277,8 @@ object DefaultRabbitMQStreamingConsumer extends StrictLogging {
queueBufferSize: Int,
monitor: Monitor,
republishStrategy: RepublishStrategy,
timeout: FiniteDuration,
timeoutAction: TimeoutException => F[DeliveryResult],
blocker: Blocker)(implicit cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = {
val newQueue: F[DeliveryQueue[F, Bytes]] = createQueue(queueBufferSize)

Expand All @@ -283,6 +290,8 @@ object DefaultRabbitMQStreamingConsumer extends StrictLogging {
consumerListener,
monitor,
republishStrategy,
timeout,
timeoutAction,
blocker)(newQueue, newChannel)
})(_.close)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ final case class ConsumerConfig(name: String,
final case class StreamingConsumerConfig(name: String,
queueName: String,
bindings: immutable.Seq[AutoBindQueueConfig],
processTimeout: FiniteDuration = 10.seconds,
timeoutAction: DeliveryResult = DeliveryResult.Republish(),
timeoutLogLevel: Level = Level.WARN,
prefetchCount: Int = 100,
queueBufferSize: Int = 100,
declare: Option[AutoDeclareQueueConfig] = None,
Expand Down
46 changes: 45 additions & 1 deletion core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ myConfig {

queueName = "QUEUE1"

processTimeout = 500 ms

declare {
enabled = true
}
Expand Down Expand Up @@ -138,6 +140,48 @@ myConfig {
}
]
}

testingStreamingWithTimeout {
name = "Testing"

queueName = "QUEUE1"

declare {
enabled = true
}

prefetchCount = 500

processTimeout = 500 ms

bindings = [
{
routingKeys = ["test"]

exchange {
name = "EXCHANGE1"

declare {
enabled = true

type = "direct"
}
}
}, {
routingKeys = ["test2"]

exchange {
name = "EXCHANGE2"

declare {
enabled = true

type = "direct"
}
}
}
]
}
}

producers {
Expand Down Expand Up @@ -201,7 +245,7 @@ myConfig {
declareQueue {
name = "QUEUE2"

arguments = { "x-max-length" : 10000 }
arguments = {"x-max-length": 10000}
}

bindQueue {
Expand Down
Loading

0 comments on commit da12332

Please sign in to comment.