Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 28, 2025
1 parent da929e7 commit cbd8935
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.transaction.Transaction;
Expand All @@ -41,7 +42,7 @@
* TODO: remove this
* This will batch multiple calls waiting to perform a sync in a single call.
*/
final class PageSyncTimer extends ActiveMQScheduledComponent {
class PageSyncTimer extends ActiveMQScheduledComponent {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down Expand Up @@ -83,11 +84,6 @@ public synchronized void addTask(OperationContext context, PagedMessage message,
}


@Override
public void run() {
tick();
}

private PageEvent[] extractPendingEvents() {
synchronized (this) {
if (pageEvents.isEmpty()) {
Expand All @@ -100,25 +96,20 @@ private PageEvent[] extractPendingEvents() {
}
}

private void tick() {
@Override
public void run() {
PageEvent[] pendingEvents = extractPendingEvents();
if (pendingEvents == null) {
return;
}
OperationContext beforeContext = OperationContextImpl.getContext();

try {

for (PageEvent event : pendingEvents) {
OperationContextImpl.setContext(event.context);
store.directWritePage(event.message, event.tx, event.listCtx);
}

try {
store.ioSync();
} catch (Exception e) {
// TODO - do not merge this TODO - critical error here, server should shutdown
logger.warn(e.getMessage(), e);
}

store.ioSync();

} catch (Exception e) {
for (PageEvent event : pendingEvents) {
Expand All @@ -131,6 +122,8 @@ private void tick() {
for (PageEvent event : pendingEvents) {
event.context.done();
}

OperationContextImpl.setContext(beforeContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ private boolean writePage(Message message,
if (syncTimer == null) {
directWritePage(pagedMessage, tx, listCtx);
} else {
logger.info("Adding task {}", pagedMessage);
syncTimer.addTask(storageManager.getContext(), pagedMessage, tx, listCtx);
}

Expand All @@ -1271,31 +1272,32 @@ private boolean writePage(Message message,
}

void directWritePage(PagedMessage pagedMessage, Transaction tx, RouteContextList listCtx) throws Exception {
int bytesToWrite = pagedMessage.getEncodeSize() + PageReadWriter.SIZE_RECORD;
logger.info("Writing {}", pagedMessage);
int bytesToWrite = pagedMessage.getEncodeSize() + PageReadWriter.SIZE_RECORD;

currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
}
}

if (tx != null && tx.isAllowPageTransaction()) {
installPageTransaction(tx, listCtx);
}
if (tx != null && tx.isAllowPageTransaction()) {
installPageTransaction(tx, listCtx);
}

// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;

page.write(pagedMessage);
applyPageCounters(tx, page, listCtx, persistentSize);
page.write(pagedMessage);
applyPageCounters(tx, page, listCtx, persistentSize);

if (logger.isTraceEnabled()) {
logger.trace("Paging message {} on pageStore {} pageNr={}", pagedMessage, getStoreName(), page.getPageId());
}
if (logger.isTraceEnabled()) {
logger.trace("Paging message {} on pageStore {} pageNr={}", pagedMessage, getStoreName(), page.getPageId());
}
}

/**
Expand Down Expand Up @@ -1407,6 +1409,7 @@ public boolean hasPendingIO() {

@Override
public void destroy() throws Exception {
new Exception("Destroy is being called").printStackTrace();
// destroy has to be executed in the same executor as the cleanup
execute(this::internalDestroy);
OperationContext context = OperationContextImpl.getContext();
Expand Down
Loading

0 comments on commit cbd8935

Please sign in to comment.