Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 24, 2025
1 parent a276d82 commit e6e5cd6
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
boolean isPaging();

/**
* Schedules sync to the file storage.
*/
void addSyncPoint(OperationContext context) throws Exception;

/**
* Performs a real sync on the current IO file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@
*/
package org.apache.activemq.artemis.core.paging.impl;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
Expand All @@ -34,68 +43,94 @@
*/
final class PageSyncTimer extends ActiveMQScheduledComponent {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final PagingStoreImpl store;

private final PagingStore store;

private final ScheduledExecutorService scheduledExecutor;
protected final List<PageEvent> pageEvents = new ArrayList<>();

private boolean pendingSync;
public boolean hasPendingIO() {
return !pageEvents.isEmpty();
}

private final long timeSync;

private final Runnable runnable = this::tick;
public static class PageEvent {
PageEvent(OperationContext context, PagedMessage message, Transaction tx, RouteContextList listCtx) {
this.context = context;
this.message = message;
this.listCtx = listCtx;
this.tx = tx;
}

private final List<OperationContext> syncOperations = new LinkedList<>();
PagedMessage message;
OperationContext context;
RouteContextList listCtx;
Transaction tx;
}


PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
PageSyncTimer(PagingStoreImpl store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
this.store = store;
this.scheduledExecutor = scheduledExecutor;
this.timeSync = timeSync;
}


synchronized void addSync(OperationContext ctx) {
ctx.pageSyncLineUp();
if (!pendingSync) {
pendingSync = true;

delay();
}
syncOperations.add(ctx);
public synchronized void addTask(OperationContext context, PagedMessage message, Transaction tx, RouteContextList listCtx) {
PageEvent event = new PageEvent(context, message, tx, listCtx);
context.storeLineUp();
this.pageEvents.add(event);
delay();
}


@Override
public void run() {
tick();
}

private void tick() {
OperationContext[] pendingSyncsArray;
private PageEvent[] extractPendingEvents() {
synchronized (this) {
if (pageEvents.isEmpty()) {
return null;
}
PageEvent[] pendingsWrites = new PageEvent[pageEvents.size()];
pendingsWrites = pageEvents.toArray(pendingsWrites);
pageEvents.clear();
return pendingsWrites;
}
}

pendingSync = false;
pendingSyncsArray = new OperationContext[syncOperations.size()];
pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
syncOperations.clear();
private void tick() {
PageEvent[] pendingEvents = extractPendingEvents();
if (pendingEvents == null) {
return;
}

try {
if (pendingSyncsArray.length != 0) {

for (PageEvent event : pendingEvents) {
store.writePageCallback(event.message, event.tx, event.listCtx);
}

try {
store.ioSync();
} catch (Exception e) {
// TODO - do not merge this TODO - critical error here, server should shutdown
logger.warn(e.getMessage(), e);
}


} catch (Exception e) {
for (OperationContext ctx : pendingSyncsArray) {
ctx.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getClass() + " during ioSync for paging on " + store.getStoreName() + ": " + e.getMessage());
for (PageEvent event : pendingEvents) {
event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getClass() + " during ioSync for paging on " + store.getStoreName() + ": " + e.getMessage());
}
} finally {
// In case of failure, The context should propagate an exception to the client
// We send an exception to the client even on the case of a failure
// to avoid possible locks and the client not getting the exception back
for (OperationContext ctx : pendingSyncsArray) {
ctx.pageSyncDone();
for (PageEvent event : pendingEvents) {
event.context.done();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ protected void doTasks(List<PageEvent> tasks) {
}

private void write(PageEvent event) {
pagingStore.writePageCallback(event.message, event.tx, event.listCtx);
try {
pagingStore.writePageCallback(event.message, event.tx, event.listCtx);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}

private void done(PageEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ public class PagingStoreImpl implements PagingStore {

private final ArtemisExecutor executor;

private final PagedGroupWriter groupWriter;

// Bytes consumed by the queue on the memory
private final SizeAwareMetric size;

Expand Down Expand Up @@ -184,8 +182,6 @@ public PagingStoreImpl(final SimpleString address,
throw new IllegalStateException("Paging Manager can't be null");
}

this.groupWriter = new PagedGroupWriter(this, executor);

this.address = address;

this.storageManager = storageManager;
Expand Down Expand Up @@ -529,17 +525,6 @@ public SimpleString getStoreName() {
return storeName;
}

@Override
public void addSyncPoint(OperationContext context) throws Exception {
if (fileFactory == null || !fileFactory.supportsIndividualContext()) {
if (syncTimer != null) {
syncTimer.addSync(context);
} else {
ioSync();
}
}
}

@Override
public void ioSync() throws Exception {
if (!fileFactory.supportsIndividualContext()) {
Expand Down Expand Up @@ -1273,7 +1258,13 @@ private boolean writePage(Message message,

PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);

groupWriter.addTask(storageManager.getContext(), pagedMessage, tx, listCtx);
if (syncTimer == null) {
writePageCallback(pagedMessage, tx, listCtx);
} else {
syncTimer.addTask(storageManager.getContext(), pagedMessage, tx, listCtx);
}

//groupWriter.addTask(storageManager.getContext(), pagedMessage, tx, listCtx);


return true;
Expand All @@ -1282,8 +1273,7 @@ private boolean writePage(Message message,
}
}

void writePageCallback(PagedMessage pagedMessage, Transaction tx, RouteContextList listCtx) {
try {
void writePageCallback(PagedMessage pagedMessage, Transaction tx, RouteContextList listCtx) throws Exception {
int bytesToWrite = pagedMessage.getEncodeSize() + PageReadWriter.SIZE_RECORD;

currentPageSize += bytesToWrite;
Expand All @@ -1302,17 +1292,13 @@ void writePageCallback(PagedMessage pagedMessage, Transaction tx, RouteContextLi
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);

page.write(pagedMessage);
applyPageCounters(tx, page, listCtx, persistentSize);

if (logger.isTraceEnabled()) {
logger.trace("Paging message {} on pageStore {} pageNr={}", pagedMessage, getStoreName(), page.getPageId());
}
} catch (Exception e) {
// TODO - DO-NOT-MERGE-WITH-THIS-TODO : Critical error
logger.warn(e.getMessage(), e);
}
}

/**
Expand Down Expand Up @@ -1416,7 +1402,7 @@ private void installPageTransaction(final Transaction tx, final RouteContextList
public boolean hasPendingIO() {
lock.writeLock().lock();
try {
return !(groupWriter.isFlushed() && groupWriter.isEmpty());
return syncTimer.hasPendingIO();
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1511,24 +1497,11 @@ public void afterRollback(final Transaction tx) {

@Override
public void beforeCommit(final Transaction tx) throws Exception {
if (!tx.isAsync()) {
syncStore();
}
storePageTX(tx);
}

/**
* @throws Exception
*/
private void syncStore() throws Exception {
for (PagingStore store : usedStores) {
store.addSyncPoint(storageManager.getContext());
}
}

@Override
public void beforePrepare(final Transaction tx) throws Exception {
syncStore();
storePageTX(tx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4208,74 +4208,6 @@ private void internalTestPageMultipleDestinations(final boolean transacted) thro
}
}

@TestTemplate
public void testSyncPage() throws Exception {
Configuration config = createDefaultInVMConfig();

server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);

server.start();

try {
server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST));
server.createQueue(QueueConfiguration.of(PagingTest.ADDRESS).setRoutingType(RoutingType.ANYCAST));

final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);

OperationContext ctx = new DummyOperationContext(pageUp, pageDone);

OperationContextImpl.setContext(ctx);

PagingManager paging = server.getPagingManager();

PagingStore store = paging.getPageStore(ADDRESS);

store.addSyncPoint(OperationContextImpl.getContext());

assertTrue(pageUp.await(10, TimeUnit.SECONDS));

assertTrue(pageDone.await(10, TimeUnit.SECONDS));

server.stop();

} finally {
try {
server.stop();
} catch (Throwable ignored) {
}

OperationContextImpl.clearContext();
}

}

@TestTemplate
public void testSyncPageTX() throws Exception {
Configuration config = createDefaultInVMConfig();

server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);

server.start();
server.createQueue(QueueConfiguration.of(PagingTest.ADDRESS).setRoutingType(RoutingType.ANYCAST));

final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);

OperationContext ctx = new DummyOperationContext(pageUp, pageDone);
OperationContextImpl.setContext(ctx);

PagingManager paging = server.getPagingManager();

PagingStore store = paging.getPageStore(ADDRESS);

store.addSyncPoint(OperationContextImpl.getContext());

assertTrue(pageUp.await(10, TimeUnit.SECONDS));

assertTrue(pageDone.await(10, TimeUnit.SECONDS));
}

@TestTemplate
public void testPagingOneDestinationOnly() throws Exception {
SimpleString PAGED_ADDRESS = SimpleString.of("paged");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,6 @@ public boolean isPaging() {
return false;
}

@Override
public void addSyncPoint(OperationContext context) throws Exception {

}

@Override
public Page usePage(long page, boolean create) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ under the License.
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>

<page-sync-timeout>185920000</page-sync-timeout>
<page-sync-timeout>1000000</page-sync-timeout>


<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
Expand Down Expand Up @@ -157,7 +157,7 @@ under the License.
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-messages>50000</max-read-page-messages>
<max-read-page-bytes>-1</max-read-page-bytes>
<max-size-bytes>0</max-size-bytes>
<page-size-bytes>10M</page-size-bytes>
Expand Down
Loading

0 comments on commit e6e5cd6

Please sign in to comment.