Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -41,32 +44,36 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerHandlerTest extends ZKTestCase {

protected static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerTest.class);

// Test constants for shutdown and packet queuing behavior
private static final int CONCURRENT_PACKET_COUNT = 2000;
private static final int PACKET_SEND_DELAY_MS = 10;
private static final int INITIAL_PACKET_ACCUMULATION_TIME_MS = 1000;

class MockLearnerHandler extends LearnerHandler {

boolean threadStarted = false;
boolean sendingThreadStarted = true;

MockLearnerHandler(Socket sock, Leader leader) throws IOException {
super(sock, new BufferedInputStream(sock.getInputStream()), leader);
}

protected void startSendingPackets() {
threadStarted = true;
sendingThreadStarted = true;
}

@Override
protected boolean shouldSendMarkerPacketForLogging() {
return false;
}

}

class MockZKDatabase extends ZKDatabase {
Expand Down Expand Up @@ -136,6 +143,14 @@ public void setUp() throws Exception {
db = new MockZKDatabase(null);
sock = mock(Socket.class);

doAnswer(invocation -> {
System.out.println("Mock socket close called, simulating delay...");
Thread.sleep(5000);
System.out.println("Socket close timed out, closing socket...");
return null;
}).when(sock).close();


// Intercept when startForwarding is called
leader = mock(Leader.class);
when(leader.startForwarding(ArgumentMatchers.any(LearnerHandler.class), ArgumentMatchers.anyLong())).thenAnswer(new Answer<Long>() {
Expand All @@ -145,8 +160,50 @@ public Long answer(InvocationOnMock invocation) {
}
});
when(leader.getZKDatabase()).thenReturn(db);

learnerHandler = new MockLearnerHandler(sock, leader);

// Use reflection to access and mock the private SyncLimitCheck class
Class<?> syncLimitCheckClass = null;
for (Class<?> innerClass : LearnerHandler.class.getDeclaredClasses()) {
if (innerClass.getSimpleName().equals("SyncLimitCheck")) {
syncLimitCheckClass = innerClass;
break;
}
}

// Create a mock of the private inner class using Mockito with Answer
Object mockSyncLimitCheck = mock(syncLimitCheckClass, invocation -> {
// Stub the check method to return false
if ("check".equals(invocation.getMethod().getName())) {
return false;
}
// For other methods, use default behavior
return Mockito.RETURNS_DEFAULTS.answer(invocation);
});

// Inject the mock into LearnerHandler instance
Field field = LearnerHandler.class.getDeclaredField("syncLimitCheck");
field.setAccessible(true);
field.set(learnerHandler, mockSyncLimitCheck);
List<LearnerHandler> learnerHandlers = new ArrayList<>();
learnerHandlers.add(learnerHandler);
when(leader.getLearners()).thenReturn(learnerHandlers);
// I want to test removeLearnerHandler function in Leader
doAnswer(invocation -> {
LearnerHandler lh = invocation.getArgument(0);
if (lh == learnerHandler) {
LOG.info("Mock removeLearnerHandler called");
}
if (!learnerHandlers.isEmpty()) {
LOG.info("Removing learner handler: {}", lh);
learnerHandlers.remove(lh);
}
return null;
}).when(leader).removeLearnerHandler(ArgumentMatchers.any(LearnerHandler.class));

Field fieldSendingThreadStarted = LearnerHandler.class.getDeclaredField("sendingThreadStarted");
fieldSendingThreadStarted.setAccessible(true);
fieldSendingThreadStarted.set(learnerHandler, learnerHandler.sendingThreadStarted);
}

Proposal createProposal(long zxid) {
Expand Down Expand Up @@ -174,7 +231,7 @@ public void queuedPacketMatches(long[] zxids) {

void reset() {
learnerHandler.getQueuedPackets().clear();
learnerHandler.threadStarted = false;
learnerHandler.sendingThreadStarted = false;
learnerHandler.setFirstPacket(true);
}

Expand Down Expand Up @@ -556,4 +613,112 @@ public void testTxnLogGap() throws Exception {
reset();
}

/**
* Tests that the packet queue can grow during a slow/blocked shutdown process.
* This test verifies a specific race condition scenario:
* 1. A background thread continuously adds packets to the learner's queue
* 2. Shutdown is initiated, which clears the queue and adds proposalOfDeath
* 3. Shutdown then blocks on socket.close() (mocked to take 5 seconds)
* 4. During the socket close wait, the background thread continues adding packets
* 5. The queue grows even though shutdown has been initiated
* <p>
* This simulates real-world scenarios where:
* - Network I/O is slow during shutdown
* - The leader continues sending packets while learner is shutting down
* - The queue can unexpectedly grow during the shutdown process
*</p>
*
* @throws InterruptedException if thread operations are interrupted
*/
@Test
public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws InterruptedException {
// Setup a thread that will keep adding packets to the queue even after shutdown starts
// This simulates a leader adding packets concurrently with shutdown
Thread packetSender = new Thread(() -> {
// Add packets to queue
for (int i = 0; i < CONCURRENT_PACKET_COUNT; i++) {
if (leader.getLearners().isEmpty()) {
LOG.debug("No learners available, stopping packet sender.");
break;
}
leader.getLearners().get(0).queuePacket(new QuorumPacket());
try {
Thread.sleep(PACKET_SEND_DELAY_MS); // Space out the additions
} catch (InterruptedException e) {
LOG.warn("Packet sender thread interrupted", e);
Thread.currentThread().interrupt();
break;
}
}
});
packetSender.start();
LOG.debug("Packet sender started, adding packets to queue...");
Thread.sleep(INITIAL_PACKET_ACCUMULATION_TIME_MS); // Allow some packets to be added
LOG.debug("Current queue size: {}", learnerHandler.getQueuedPackets().size());
learnerHandler.startSendingPackets();

// Verify that packets are being added
int initialQueueSize = learnerHandler.getQueuedPackets().size();
assertTrue(initialQueueSize > 0,
"Queue should have some packets before shutdown (actual: " + initialQueueSize + ")");
LOG.debug("Initial queue size before shutdown: {}", initialQueueSize);

// Start shutdown in a separate thread
// Shutdown will: clear queue, add proposalOfDeath, then block on socket.close() for 5 seconds
Thread shutdownThread = new Thread(() -> {
try {
learnerHandler.shutdown();
} catch (Exception e) {
LOG.warn("Exception during learner handler shutdown", e);
}
});
shutdownThread.start();
// Give shutdown time to clear the queue and start blocking on socket close
Thread.sleep(500);

// At this point:
// - Shutdown has cleared the queue and added proposalOfDeath (queue size = 1)
// - Shutdown is now blocked in socket.close() for 5 seconds
// - Packet sender is still running and adding packets
int queueSizeAfterShutdownStart = learnerHandler.getQueuedPackets().size();
LOG.debug("Queue size after shutdown started (should be ~1): {}", queueSizeAfterShutdownStart);

// Wait for packets to accumulate during the socket close wait
Thread.sleep(2000);

// Check that queue has grown during the shutdown process
int queueSizeDuringShutdown = learnerHandler.getQueuedPackets().size();
LOG.debug("Queue size during shutdown (socket close wait): {}", queueSizeDuringShutdown);

// The queue should have grown because:
// - Shutdown cleared it to 1 (proposalOfDeath)
// - But packet sender kept adding while shutdown waits on socket.close()
assertTrue(queueSizeDuringShutdown > queueSizeAfterShutdownStart,
"Queue should grow during shutdown when socket close is delayed (after shutdown start: "
+ queueSizeAfterShutdownStart + ", during shutdown: "
+ queueSizeDuringShutdown + ")");

// Wait for shutdown to complete
shutdownThread.join(10000);
if (shutdownThread.isAlive()) {
LOG.warn("Shutdown thread did not complete, interrupting");
shutdownThread.interrupt();
shutdownThread.join(2000);
}

// Wait for packet sender to complete
packetSender.join(5000);
if (packetSender.isAlive()) {
LOG.warn("Packet sender thread did not complete, interrupting");
packetSender.interrupt();
packetSender.join(1000);
}

int finalQueueSize = learnerHandler.getQueuedPackets().size();
LOG.debug("Final queue size after shutdown complete: {}", finalQueueSize);

// Verify learner was removed from leader
assertEquals(0, leader.getLearners().size(),
"Leader should have no learners after shutdown completes");
}
}
Loading