-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Open
Description
Version
4.5.10
Context
When sending/publishing messages on the eventbus, and (one of) the receiver verticle is being undeployed, and the receiver verticle runs on a custom thread pool, the publish fails due to the receiving context's thread pool rejecting the operation since it's shutting down.
Do you have a reproducer?
No
Steps to reproduce
- Deploy multiple verticles on different (separate) worker pools - each verticle will subscribe to the same eventbus address in order to receive broadcasted messages
- Some other part of the system will publish (broadcast) messages to the eventbus address independently of the rest of the system
- Undeploy one of the verticles
- Since the verticle being undeployed is the only "user" of the dedicated worker pool, that worker pool will be shutdown
- Now, there's a race condition: if a message is being published to the "shared" address on the eventbus, it's possible that one of the handler registrations points towards a context where the worker pool (executor) is shutdown, and thus will reject enqueing the operation.
- This will in turn possibly not deliver the message to all consumers for a publish, or no consumer at all in case of a regular "round robin" send
- And the exception will be bubbled to the caller of the .publish()-method
Extra
Stack trace/Exception
java.util.concurrent.RejectedExecutionException: Task io.vertx.core.impl.TaskQueue$$Lambda$254/0x00000001003ad840@7819b595 rejected from java.util.concurrent.ThreadPoolExecutor@2b8e2d1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 91]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) ~[?:?]
at io.vertx.core.impl.TaskQueue.execute(TaskQueue.java:149) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.impl.WorkerExecutor.execute(WorkerExecutor.java:60) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.HandlerRegistration.receive(HandlerRegistration.java:46) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.deliverMessageLocally(EventBusImpl.java:375) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.sendLocally(EventBusImpl.java:341) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPub(EventBusImpl.java:329) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.OutboundDeliveryContext.execute(OutboundDeliveryContext.java:109) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.DeliveryContextBase.next(DeliveryContextBase.java:80) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:28) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:422) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:429) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.publish(EventBusImpl.java:164) ~[vertx-core-4.5.10.jar:4.5.10]
at io.vertx.core.eventbus.impl.EventBusImpl.publish(EventBusImpl.java:159) ~[vertx-core-4.5.10.jar:4.5.10]
Idea for where to change things:
- https://github.com/eclipse-vertx/vert.x/blob/4.x/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
- I think that the main issue here is to make sure to catch exceptions when calling context.executor().execute() and discard the message instead of bubbling the exception - this would not allow for "round robin continuation" to the next consumer (not an issue for publish/broadcast-messages)
- https://github.com/eclipse-vertx/vert.x/blob/4.x/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java#L353 - deliverMessageLocally
- For regular non-publish/non-broadcast-messages, in order to allow "round robin" to work in this scenario, I guess nextHandler() would need to be called in case it wasn't possible to enque the operation on the other context's executor..?
For our use case, we are only concerned about publish-messages, but I do see potential issues for regular messages too