2121import static org .junit .jupiter .api .Assertions .assertEquals ;
2222import static org .junit .jupiter .api .Assertions .assertFalse ;
2323import static org .junit .jupiter .api .Assertions .assertTrue ;
24- import static org .mockito .Mockito .*;
25-
24+ import static org .mockito .Mockito .doAnswer ;
25+ import static org .mockito .Mockito .mock ;
26+ import static org .mockito .Mockito .when ;
2627import java .io .BufferedInputStream ;
2728import java .io .IOException ;
2829import java .lang .reflect .Field ;
5051import org .slf4j .LoggerFactory ;
5152
5253public class LearnerHandlerTest extends ZKTestCase {
53-
5454 protected static final Logger LOG = LoggerFactory .getLogger (LearnerHandlerTest .class );
55-
55+
5656 // Test constants for shutdown and packet queuing behavior
5757 private static final int CONCURRENT_PACKET_COUNT = 2000 ;
5858 private static final int PACKET_SEND_DELAY_MS = 10 ;
@@ -160,9 +160,8 @@ public Long answer(InvocationOnMock invocation) {
160160 }
161161 });
162162 when (leader .getZKDatabase ()).thenReturn (db );
163-
164163 learnerHandler = new MockLearnerHandler (sock , leader );
165-
164+
166165 // Use reflection to access and mock the private SyncLimitCheck class
167166 Class <?> syncLimitCheckClass = null ;
168167 for (Class <?> innerClass : LearnerHandler .class .getDeclaredClasses ()) {
@@ -171,9 +170,9 @@ public Long answer(InvocationOnMock invocation) {
171170 break ;
172171 }
173172 }
174-
173+
175174 // Create a mock of the private inner class using Mockito with Answer
176- Object mockSyncLimitCheck = Mockito . mock (syncLimitCheckClass , invocation -> {
175+ Object mockSyncLimitCheck = mock (syncLimitCheckClass , invocation -> {
177176 // Stub the check method to return false
178177 if ("check" .equals (invocation .getMethod ().getName ())) {
179178 return false ;
@@ -616,21 +615,19 @@ public void testTxnLogGap() throws Exception {
616615
617616 /**
618617 * Tests that the packet queue can grow during a slow/blocked shutdown process.
619- *
620618 * This test verifies a specific race condition scenario:
621619 * 1. A background thread continuously adds packets to the learner's queue
622620 * 2. Shutdown is initiated, which clears the queue and adds proposalOfDeath
623621 * 3. Shutdown then blocks on socket.close() (mocked to take 5 seconds)
624622 * 4. During the socket close wait, the background thread continues adding packets
625623 * 5. The queue grows even though shutdown has been initiated
626- *
624+ * <p>
627625 * This simulates real-world scenarios where:
628626 * - Network I/O is slow during shutdown
629627 * - The leader continues sending packets while learner is shutting down
630628 * - The queue can unexpectedly grow during the shutdown process
631- *
632- * The test ensures this behavior is understood and can be monitored.
633- *
629+ *</p>
630+ *
634631 * @throws InterruptedException if thread operations are interrupted
635632 */
636633 @ Test
@@ -659,13 +656,13 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter
659656 Thread .sleep (INITIAL_PACKET_ACCUMULATION_TIME_MS ); // Allow some packets to be added
660657 LOG .debug ("Current queue size: {}" , learnerHandler .getQueuedPackets ().size ());
661658 learnerHandler .startSendingPackets ();
662-
659+
663660 // Verify that packets are being added
664661 int initialQueueSize = learnerHandler .getQueuedPackets ().size ();
665- assertTrue (initialQueueSize > 0 ,
662+ assertTrue (initialQueueSize > 0 ,
666663 "Queue should have some packets before shutdown (actual: " + initialQueueSize + ")" );
667664 LOG .debug ("Initial queue size before shutdown: {}" , initialQueueSize );
668-
665+
669666 // Start shutdown in a separate thread
670667 // Shutdown will: clear queue, add proposalOfDeath, then block on socket.close() for 5 seconds
671668 Thread shutdownThread = new Thread (() -> {
@@ -676,50 +673,50 @@ public void testLearnerHandlerShutdownWithConcurrentPacketQueuing() throws Inter
676673 }
677674 });
678675 shutdownThread .start ();
679-
680676 // Give shutdown time to clear the queue and start blocking on socket close
681677 Thread .sleep (500 );
682-
678+
683679 // At this point:
684680 // - Shutdown has cleared the queue and added proposalOfDeath (queue size = 1)
685681 // - Shutdown is now blocked in socket.close() for 5 seconds
686682 // - Packet sender is still running and adding packets
687683 int queueSizeAfterShutdownStart = learnerHandler .getQueuedPackets ().size ();
688684 LOG .debug ("Queue size after shutdown started (should be ~1): {}" , queueSizeAfterShutdownStart );
689-
685+
690686 // Wait for packets to accumulate during the socket close wait
691687 Thread .sleep (2000 );
692-
688+
693689 // Check that queue has grown during the shutdown process
694690 int queueSizeDuringShutdown = learnerHandler .getQueuedPackets ().size ();
695691 LOG .debug ("Queue size during shutdown (socket close wait): {}" , queueSizeDuringShutdown );
696-
692+
697693 // The queue should have grown because:
698694 // - Shutdown cleared it to 1 (proposalOfDeath)
699695 // - But packet sender kept adding while shutdown waits on socket.close()
700696 assertTrue (queueSizeDuringShutdown > queueSizeAfterShutdownStart ,
701- "Queue should grow during shutdown when socket close is delayed (after shutdown start: " +
702- queueSizeAfterShutdownStart + ", during shutdown: " + queueSizeDuringShutdown + ")" );
703-
697+ "Queue should grow during shutdown when socket close is delayed (after shutdown start: "
698+ + queueSizeAfterShutdownStart + ", during shutdown: "
699+ + queueSizeDuringShutdown + ")" );
700+
704701 // Wait for shutdown to complete
705702 shutdownThread .join (10000 );
706703 if (shutdownThread .isAlive ()) {
707704 LOG .warn ("Shutdown thread did not complete, interrupting" );
708705 shutdownThread .interrupt ();
709706 shutdownThread .join (2000 );
710707 }
711-
708+
712709 // Wait for packet sender to complete
713710 packetSender .join (5000 );
714711 if (packetSender .isAlive ()) {
715712 LOG .warn ("Packet sender thread did not complete, interrupting" );
716713 packetSender .interrupt ();
717714 packetSender .join (1000 );
718715 }
719-
716+
720717 int finalQueueSize = learnerHandler .getQueuedPackets ().size ();
721718 LOG .debug ("Final queue size after shutdown complete: {}" , finalQueueSize );
722-
719+
723720 // Verify learner was removed from leader
724721 assertEquals (0 , leader .getLearners ().size (),
725722 "Leader should have no learners after shutdown completes" );
0 commit comments