Skip to content

Commit

Permalink
#5337: EventBusImpl.deliverMessageLocally did not properly handle if …
Browse files Browse the repository at this point in the history
…the receiving context's event loop has been shutdown
  • Loading branch information
Onkelborg committed Nov 29, 2024
1 parent 0b41047 commit 109688d
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 18 deletions.
79 changes: 69 additions & 10 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
Expand All @@ -40,6 +47,7 @@ public class EventBusImpl implements EventBusInternal, MetricsProvider {
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors");
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors");

static final Logger logger = LoggerFactory.getLogger(EventBusImpl.class);
private volatile Handler<DeliveryContext>[] outboundInterceptors = new Handler[0];
private volatile Handler<DeliveryContext>[] inboundInterceptors = new Handler[0];
private final AtomicLong replySequence = new AtomicLong(0);
Expand Down Expand Up @@ -353,17 +361,46 @@ protected boolean isMessageLocal(MessageImpl msg) {
protected ReplyException deliverMessageLocally(MessageImpl msg) {
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
boolean messageLocal = isMessageLocal(msg);
boolean findingHandlerFailed = true;
if (handlers != null) {
if (msg.isSend()) {
//Choose one
HandlerHolder holder = nextHandler(handlers, messageLocal);
HandlerHolder holder = nextHandler(handlers, messageLocal, null);
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0);
}
if (holder != null) {
holder.handler.receive(msg.copyBeforeReceive());
} else {
// RACY issue !!!!!
/*
In case the handler isn't able to enqueue the operation, we will try until we have exhausted all the handlers
before failing hard.
*/
Set<HandlerHolder> blacklistedHandlers = null;
while(true) {
if (holder != null) {
try {
holder.handler.receive(msg.copyBeforeReceive());
findingHandlerFailed = false;
} catch (RejectedExecutionException e) {
if(blacklistedHandlers == null) {
blacklistedHandlers = new HashSet<>();
}
blacklistedHandlers.add(holder);
holder = nextHandler(handlers, messageLocal, blacklistedHandlers);
if(holder != null) {
if(logger.isDebugEnabled()) {
logger.debug(String.format("Failed to enqueue message onto handler during send, will try another handler. Address: %s", msg.address()), e);
}
continue;
}
else {
if(logger.isDebugEnabled()) {
logger.debug(String.format("Failed to enqueue message onto handler during send, no other handler found. Address: %s", msg.address()), e);
}
}
}
} else {
// RACY issue !!!!!
}
break;
}
} else {
// Publish
Expand All @@ -372,21 +409,43 @@ protected ReplyException deliverMessageLocally(MessageImpl msg) {
}
for (HandlerHolder holder: handlers) {
if (messageLocal || !holder.isLocalOnly()) {
holder.handler.receive(msg.copyBeforeReceive());
try {
holder.handler.receive(msg.copyBeforeReceive());
findingHandlerFailed = false;
} catch (RejectedExecutionException e) {
if(logger.isDebugEnabled()) {
logger.debug(String.format("Failed to enqueue message onto handler during publish. Address: %s", msg.address()), e);
}
}
}
}
}
return null;
} else {
}
if (findingHandlerFailed) {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0);
}
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
}
return null;
}

protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal) {
return handlers.next();
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal, Collection<HandlerHolder> blacklistedHandlers) {
return nextHandlerMessageLocal(handlers, blacklistedHandlers);
}

protected static HandlerHolder nextHandlerMessageLocal(ConcurrentCyclicSequence<HandlerHolder> handlers, Collection<HandlerHolder> blacklistedHandlers) {
if(blacklistedHandlers == null) {
return handlers.next();
}
final Iterator<HandlerHolder> iterator = handlers.iterator();
while (iterator.hasNext()) {
final HandlerHolder handlerHolder = iterator.next();
if(!blacklistedHandlers.contains(handlerHolder)) {
return handlerHolder;
}
}
return null;
}

protected void checkStarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.VertxMetrics;

import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -242,17 +243,19 @@ protected boolean isMessageLocal(MessageImpl msg) {
}

@Override
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal) {
protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal, Collection<HandlerHolder> blacklistedHandlers) {
HandlerHolder handlerHolder = null;
if (messageLocal) {
handlerHolder = handlers.next();
handlerHolder = nextHandlerMessageLocal(handlers, blacklistedHandlers);
} else {
Iterator<HandlerHolder> iterator = handlers.iterator(false);
while (iterator.hasNext()) {
HandlerHolder next = iterator.next();
if (next.isReplyHandler() || !next.isLocalOnly()) {
handlerHolder = next;
break;
if(blacklistedHandlers == null || !blacklistedHandlers.contains(next)) {
handlerHolder = next;
break;
}
}
}
}
Expand Down
82 changes: 78 additions & 4 deletions src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
package io.vertx.core.eventbus;

import io.vertx.core.*;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableImplObject;
import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableObject;
import io.vertx.core.shareddata.AsyncMapTest.SomeSerializableObject;
Expand All @@ -25,10 +27,9 @@
import org.junit.Test;

import java.io.InvalidClassException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -706,4 +707,77 @@ public void testMultiHeaders() {
await();

}

@Test
public void testNextHandlerForNonLocalMessageEmptyBlacklist() throws Throwable {
testNextHandlerInternal(0, 10, false);
}

@Test
public void testNextHandlerForNonLocalMessageNullBlacklist() throws Throwable {
testNextHandlerInternal(-1, 10, false);
}

@Test
public void testNextHandlerForNonLocalMessageHalfBlacklisted() throws Throwable {
testNextHandlerInternal(5, 10, false);
}

@Test
public void testNextHandlerForNonLocalMessageAllBlacklisted() throws Throwable {
testNextHandlerInternal(10, 10, false);
}

@Test
public void testNextHandlerForLocalMessageEmptyBlacklist() throws Throwable {
testNextHandlerInternal(0, 10, true);
}

@Test
public void testNextHandlerForLocalMessageNullBlacklist() throws Throwable {
testNextHandlerInternal(-1, 10, true);
}

@Test
public void testNextHandlerForLocalMessageHalfBlacklisted() throws Throwable {
testNextHandlerInternal(5, 10, true);
}

@Test
public void testNextHandlerForLocalMessageAllBlacklisted() throws Throwable {
testNextHandlerInternal(10, 10, true);
}

private void testNextHandlerInternal(int numberOfEntriesToBlacklist, int totalNumberOfEntries, boolean localMessage) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
int expectedIndex = numberOfEntriesToBlacklist >= 0 ? (numberOfEntriesToBlacklist >= totalNumberOfEntries ? -1 : numberOfEntriesToBlacklist) : 0;
startNodes(1);
waitFor(1);
final EventBus eventBus = vertices[0].eventBus();
List<HandlerHolder> handlerHolders = new ArrayList<>();
for(int i = 0; i < totalNumberOfEntries; i++) {
final int handlerIndex = i;
handlerHolders.add(new HandlerHolder(null, false, false, null) {
@Override
public String toString() {
return super.toString() + " Index: " + handlerIndex;
}

@Override
public boolean equals(Object o) {
return this == o;
}

@Override
public int hashCode() {
return handlerIndex;
}
});
}
List<HandlerHolder> blacklist = numberOfEntriesToBlacklist >= 0 ? handlerHolders.stream().limit(numberOfEntriesToBlacklist).collect(Collectors.toList()) : null;
final ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence = new ConcurrentCyclicSequence<>(handlerHolders.toArray(new HandlerHolder[0]));
final Method methodNextHandler = eventBus.getClass().getDeclaredMethod("nextHandler", new Class<?>[]{ConcurrentCyclicSequence.class, Boolean.TYPE, Collection.class});
methodNextHandler.setAccessible(true);
final HandlerHolder selectedHandleHolder = (HandlerHolder) methodNextHandler.invoke(eventBus, concurrentCyclicSequence, localMessage, blacklist);
assertSame(expectedIndex >= 0 ? handlerHolders.get(expectedIndex) : null, selectedHandleHolder);
}
}
Loading

0 comments on commit 109688d

Please sign in to comment.