Skip to content

Commit 4d509d6

Browse files
disableREplica issue
1 parent 2416d1f commit 4d509d6

File tree

3 files changed

+77
-18
lines changed

3 files changed

+77
-18
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected void processMessages() {
123123
try {
124124
for (PageEvent event : pendingEvents) {
125125
OperationContextImpl.setContext(event.context);
126-
store.directWritePage(event.message, event.tx, event.listCtx, false);
126+
store.directWritePage(event.message, false);
127127
}
128128
store.ioSync();
129129

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1265,7 +1265,7 @@ private boolean writePage(Message message,
12651265
}
12661266

12671267
if (timedWriter == null) {
1268-
directWritePage(pagedMessage, tx, listCtx, true);
1268+
directWritePage(pagedMessage, true);
12691269
} else {
12701270
timedWriter.addTask(storageManager.getContext(), pagedMessage, tx, listCtx);
12711271
}
@@ -1278,7 +1278,7 @@ private boolean writePage(Message message,
12781278
}
12791279
}
12801280

1281-
void directWritePage(PagedMessage pagedMessage, Transaction tx, RouteContextList listCtx, boolean lineUp) throws Exception {
1281+
void directWritePage(PagedMessage pagedMessage, boolean lineUp) throws Exception {
12821282
int bytesToWrite = pagedMessage.getEncodeSize() + PageReadWriter.SIZE_RECORD;
12831283

12841284
currentPageSize += bytesToWrite;
@@ -1293,6 +1293,7 @@ void directWritePage(PagedMessage pagedMessage, Transaction tx, RouteContextList
12931293
// doing this will give us a possibility of recovering the page counters
12941294
final Page page = currentPage;
12951295

1296+
logger.info("Calling page.write");
12961297
page.write(pagedMessage, lineUp);
12971298

12981299
if (logger.isTraceEnabled()) {

artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriterUnitTest.java

+73-15
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.concurrent.atomic.AtomicLong;
2929

30+
import org.apache.activemq.artemis.api.core.SimpleString;
3031
import org.apache.activemq.artemis.core.config.Configuration;
3132
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
3233
import org.apache.activemq.artemis.core.io.IOCallback;
@@ -37,18 +38,22 @@
3738
import org.apache.activemq.artemis.core.persistence.OperationContext;
3839
import org.apache.activemq.artemis.core.persistence.StorageManager;
3940
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
41+
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManagerAccessor;
4042
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
43+
import org.apache.activemq.artemis.core.replication.ReplicationManager;
4144
import org.apache.activemq.artemis.core.server.JournalType;
4245
import org.apache.activemq.artemis.core.server.RouteContextList;
4346
import org.apache.activemq.artemis.core.transaction.Transaction;
4447
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
4548
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
4649
import org.apache.activemq.artemis.tests.util.ArtemisTestCase;
4750
import org.apache.activemq.artemis.utils.ExecutorFactory;
51+
import org.apache.activemq.artemis.utils.ReusableLatch;
4852
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
4953
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
5054
import org.junit.jupiter.api.BeforeEach;
5155
import org.junit.jupiter.api.Test;
56+
import org.mockito.Mock;
5257
import org.mockito.Mockito;
5358
import org.slf4j.Logger;
5459
import org.slf4j.LoggerFactory;
@@ -66,11 +71,13 @@ public class PageTimedWriterUnitTest extends ArtemisTestCase {
6671
ExecutorService executorService;
6772
OrderedExecutorFactory executorFactory;
6873
OperationContext context;
69-
JournalStorageManager journalStorageManager;
74+
75+
// almost real as we use an extension that can allow mocking internal objects such as journals and replicationManager
76+
JournalStorageManager realJournalStorageManager;
7077

7178
PagingStoreImpl mockPageStore;
7279

73-
CountDownLatch allowRunning;
80+
ReusableLatch allowRunning;
7481

7582
PageTimedWriter timer;
7683

@@ -79,6 +86,10 @@ public class PageTimedWriterUnitTest extends ArtemisTestCase {
7986
Journal mockBindingsJournal;
8087
Journal mockMessageJournal;
8188

89+
AtomicBoolean useReplication = new AtomicBoolean(false);
90+
AtomicBoolean returnSynchronizing = new AtomicBoolean(false);
91+
ReplicationManager mockReplicationManager;
92+
8293

8394
class MockableJournalStorageManager extends JournalStorageManager {
8495

@@ -106,7 +117,7 @@ protected void createDirectories() {
106117

107118

108119
@BeforeEach
109-
public void prepareTest() throws Exception {
120+
public void prepareMocks() throws Exception {
110121
configuration = new ConfigurationImpl();
111122
configuration.setJournalType(JournalType.NIO);
112123
scheduledExecutorService = Executors.newScheduledThreadPool(10);
@@ -121,14 +132,34 @@ public void prepareTest() throws Exception {
121132
mockBindingsJournal = Mockito.mock(Journal.class);
122133
mockMessageJournal = Mockito.mock(Journal.class);
123134

124-
journalStorageManager = new MockableJournalStorageManager(configuration, mockBindingsJournal, mockMessageJournal, executorFactory, executorFactory);
125-
journalStorageManager.start();
135+
mockReplicationManager = Mockito.mock(ReplicationManager.class);
136+
Mockito.when(mockReplicationManager.isStarted()).thenAnswer(a -> useReplication.get());
137+
Mockito.when(mockReplicationManager.isSynchronizing()).thenAnswer(a -> returnSynchronizing.get());
138+
Mockito.doAnswer(a -> {
139+
if (useReplication.get()) {
140+
OperationContext ctx = OperationContextImpl.getContext();
141+
if (ctx != null) {
142+
ctx.replicationDone();
143+
}
144+
}
145+
return null;
146+
}).when(mockReplicationManager).pageWrite(Mockito.any(SimpleString.class), Mockito.any(PagedMessage.class), Mockito.anyLong(), Mockito.anyBoolean());
126147

127-
allowRunning = new CountDownLatch(1);
148+
realJournalStorageManager = new MockableJournalStorageManager(configuration, mockBindingsJournal, mockMessageJournal, executorFactory, executorFactory);
149+
realJournalStorageManager.start();
150+
151+
JournalStorageManagerAccessor.setReplicationManager(realJournalStorageManager, mockReplicationManager);
152+
153+
allowRunning = new ReusableLatch(1);
128154

129155
mockPageStore = Mockito.mock(PagingStoreImpl.class);
156+
Mockito.doAnswer(a -> {
157+
realJournalStorageManager.pageWrite(SimpleString.of("whatever"), a.getArgument(0), 1L, a.getArgument(1));
158+
return null;
159+
}).when(mockPageStore).directWritePage(Mockito.any(PagedMessage.class), Mockito.anyBoolean());
160+
130161

131-
timer = new PageTimedWriter(journalStorageManager, mockPageStore, scheduledExecutorService, executorFactory.getExecutor(), 100) {
162+
timer = new PageTimedWriter(realJournalStorageManager, mockPageStore, scheduledExecutorService, executorFactory.getExecutor(), 100) {
132163
@Override
133164
public void run() {
134165
try {
@@ -148,7 +179,7 @@ public void run() {
148179
// a test to validate if the Mocks are correctly setup
149180
@Test
150181
public void testValidateMocks() throws Exception {
151-
TransactionImpl tx = new TransactionImpl(journalStorageManager);
182+
TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
152183
tx.setContainsPersistent();
153184
AtomicInteger count = new AtomicInteger(0);
154185
tx.addOperation(new TransactionOperationAbstract() {
@@ -162,7 +193,7 @@ public void afterCommit(Transaction tx) {
162193
assertEquals(1, count.get(), "tx.commit is not correctly wired on mocking");
163194

164195

165-
journalStorageManager.afterCompleteOperations(new IOCallback() {
196+
realJournalStorageManager.afterCompleteOperations(new IOCallback() {
166197
@Override
167198
public void done() {
168199
count.incrementAndGet();
@@ -174,7 +205,7 @@ public void onError(int errorCode, String errorMessage) {
174205
}
175206
});
176207

177-
journalStorageManager.afterCompleteOperations(new IOCallback() {
208+
realJournalStorageManager.afterCompleteOperations(new IOCallback() {
178209
@Override
179210
public void done() {
180211
count.incrementAndGet();
@@ -188,8 +219,8 @@ public void onError(int errorCode, String errorMessage) {
188219

189220
assertEquals(3, count.get(), "afterCompletion is not correctly wired on mocking");
190221

191-
long id = journalStorageManager.generateID();
192-
long newID = journalStorageManager.generateID();
222+
long id = realJournalStorageManager.generateID();
223+
long newID = realJournalStorageManager.generateID();
193224
assertEquals(1L, newID - id);
194225

195226
}
@@ -221,9 +252,34 @@ public void onError(int errorCode, String errorMessage) {
221252
public void testIOCompletionWhileReplica() throws Exception {
222253
CountDownLatch latch = new CountDownLatch(1);
223254

224-
AtomicBoolean replicated = new AtomicBoolean(true);
255+
useReplication.set(true);
256+
257+
timer.addTask(context, Mockito.mock(PagedMessage.class), null, Mockito.mock(RouteContextList.class));
258+
259+
context.executeOnCompletion(new IOCallback() {
260+
@Override
261+
public void done() {
262+
latch.countDown();
263+
}
264+
265+
@Override
266+
public void onError(int errorCode, String errorMessage) {
267+
}
268+
});
269+
270+
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
271+
allowRunning.countDown();
272+
assertTrue(latch.await(10, TimeUnit.MINUTES));
273+
274+
allowRunning.setCount(1);
275+
}
276+
277+
// add a task while replicating, process it when no longer replicating (disconnect a node scenario)
278+
@Test
279+
public void testDisableReplica() throws Exception {
280+
CountDownLatch latch = new CountDownLatch(1);
225281

226-
Mockito.when(journalStorageManager.isReplicated()).then(r -> replicated.get());
282+
useReplication.set(true);
227283

228284
timer.addTask(context, Mockito.mock(PagedMessage.class), null, Mockito.mock(RouteContextList.class));
229285

@@ -239,9 +295,11 @@ public void onError(int errorCode, String errorMessage) {
239295
});
240296

241297
assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
298+
useReplication.set(false);
242299
allowRunning.countDown();
243300
assertTrue(latch.await(10, TimeUnit.SECONDS));
244301

302+
allowRunning.setCount(1);
245303
}
246304

247305
@Test
@@ -260,7 +318,7 @@ public void testTXCompletion() throws Exception {
260318

261319
CountDownLatch latch = new CountDownLatch(1);
262320

263-
Transaction tx = new TransactionImpl(journalStorageManager, Integer.MAX_VALUE);
321+
Transaction tx = new TransactionImpl(realJournalStorageManager, Integer.MAX_VALUE);
264322
tx.setContainsPersistent();
265323

266324
timer.addTask(context, Mockito.mock(PagedMessage.class), tx, Mockito.mock(RouteContextList.class));

0 commit comments

Comments
 (0)