Skip to content

Commit 11dc14a

Browse files
fixes
1 parent 1a21114 commit 11dc14a

File tree

12 files changed

+155
-113
lines changed

12 files changed

+155
-113
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.util.concurrent.Executor;
2020

21-
public class Actor<T> extends ProcessorBase<T> {
21+
public class Actor<T> extends TaskProcessorBase<T> {
2222

2323
private final ActorListener<T> listener;
2424

@@ -33,7 +33,7 @@ protected final void doTask(T task) {
3333
}
3434

3535
public final void act(T message) {
36-
task(message);
36+
addTask(message);
3737
}
3838

3939
}

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
3030
* same method, will result in B's task running after A's.
3131
*/
32-
public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
32+
public class OrderedExecutor extends TaskProcessorBase<Runnable> implements ArtemisExecutor {
3333

3434
public OrderedExecutor(Executor delegate) {
3535
super(delegate);
@@ -69,7 +69,7 @@ protected final void doTask(Runnable task) {
6969

7070
@Override
7171
public final void execute(Runnable run) {
72-
task(run);
72+
addTask(run);
7373
}
7474

7575
@Override

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java

+7-12
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public abstract class ProcessorBase<T> extends HandlerBase {
4747
@SuppressWarnings("unused")
4848
private volatile int state = STATE_NOT_RUNNING;
4949
// Request of forced shutdown
50-
private volatile boolean requestedForcedShutdown = false;
50+
protected volatile boolean requestedForcedShutdown = false;
5151
// Request of educated shutdown:
5252
protected volatile boolean requestedShutdown = false;
5353
// Request to yield to another thread
54-
private volatile boolean yielded = false;
54+
protected volatile boolean yielded = false;
5555

5656
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
5757

@@ -61,12 +61,7 @@ private void executePendingTasks() {
6161
if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
6262
enter();
6363
try {
64-
T task;
65-
//while the queue is not empty we process in order:
66-
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
67-
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
68-
doTask(task);
69-
}
64+
processQueue();
7065
} finally {
7166
leave();
7267
//set state back to not running if possible: shutdownNow could be called by doTask(task).
@@ -90,6 +85,8 @@ private void executePendingTasks() {
9085
}
9186
}
9287

88+
protected abstract void processQueue();
89+
9390
/**
9491
* It will shutdown and wait 30 seconds for timeout.
9592
*/
@@ -135,8 +132,6 @@ public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit
135132
return pendingItems;
136133
}
137134

138-
protected abstract void doTask(T task);
139-
140135
public ProcessorBase(Executor parent) {
141136
this.delegate = parent;
142137
}
@@ -174,7 +169,7 @@ public final boolean flush(long timeout, TimeUnit unit) {
174169
return this.state == STATE_NOT_RUNNING;
175170
}
176171

177-
protected void task(T command) {
172+
protected void addTask(T command) {
178173
if (requestedShutdown) {
179174
logAddOnShutdown();
180175
return;
@@ -190,7 +185,7 @@ protected void task(T command) {
190185

191186
/**
192187
* This has to be called on the assumption that state!=STATE_RUNNING.
193-
* It is packed separately from {@link #task(Object)} just for performance reasons: it
188+
* It is packed separately from {@link #addTask(Object)} just for performance reasons: it
194189
* handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
195190
*/
196191
private void onAddedTaskIfNotRunning(int state) {

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525
import java.lang.invoke.MethodHandles;
2626

27-
public class ThresholdActor<T> extends ProcessorBase<Object> {
27+
public class ThresholdActor<T> extends TaskProcessorBase<Object> {
2828

2929
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3030

@@ -85,13 +85,13 @@ public void act(T message) {
8585
} else {
8686
logger.debug("element {} returned an invalid size over the Actor", message);
8787
}
88-
task(message);
88+
addTask(message);
8989
}
9090

9191
public void flush() {
9292
if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) {
9393
overThreshold.run();
94-
task(FLUSH);
94+
addTask(FLUSH);
9595
}
9696
}
9797

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
114114
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
115115
boolean isPaging();
116116

117-
/**
118-
* Schedules sync to the file storage.
119-
*/
120-
void addSyncPoint(OperationContext context) throws Exception;
121-
122117
/**
123118
* Performs a real sync on the current IO file.
124119
*/
@@ -265,6 +260,10 @@ default void addSize(int size) {
265260

266261
void unblock();
267262

263+
default boolean hasPendingIO() {
264+
return false;
265+
}
266+
268267
default StorageManager getStorageManager() {
269268
return null;
270269
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java

+24
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,25 @@ public Future<Boolean> scheduleCleanup() {
209209

210210
return future;
211211
}
212+
public Future<Boolean> forceScheduleCleanup() {
213+
final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
214+
215+
pagingStore.execute(() -> {
216+
storageManager.setContext(storageManager.newSingleThreadContext());
217+
try {
218+
System.out.println("Forcing cleanup");
219+
if (cleanupEnabled) {
220+
cleanup();
221+
}
222+
} finally {
223+
storageManager.clearContext();
224+
future.set(true);
225+
}
226+
});
227+
228+
return future;
229+
}
230+
212231

213232
/**
214233
* Delete everything associated with any queue on this address.
@@ -271,6 +290,11 @@ protected void cleanup() {
271290
return;
272291
}
273292

293+
if (pagingStore.hasPendingIO()) {
294+
forceScheduleCleanup();
295+
return;
296+
}
297+
274298
ArrayList<Page> depagedPages = new ArrayList<>();
275299
LongHashSet depagedPagesSet = new LongHashSet();
276300

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java

+65-28
Original file line numberDiff line numberDiff line change
@@ -16,84 +16,121 @@
1616
*/
1717
package org.apache.activemq.artemis.core.paging.impl;
1818

19+
import java.lang.invoke.MethodHandles;
20+
import java.util.ArrayList;
1921
import java.util.LinkedList;
2022
import java.util.List;
23+
import java.util.Queue;
24+
import java.util.concurrent.ConcurrentLinkedQueue;
2125
import java.util.concurrent.Executor;
2226
import java.util.concurrent.ScheduledExecutorService;
2327
import java.util.concurrent.TimeUnit;
2428

2529
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
30+
import org.apache.activemq.artemis.core.paging.PagedMessage;
2631
import org.apache.activemq.artemis.core.paging.PagingStore;
2732
import org.apache.activemq.artemis.core.persistence.OperationContext;
2833
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
34+
import org.apache.activemq.artemis.core.server.RouteContextList;
35+
import org.apache.activemq.artemis.core.transaction.Transaction;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
2938

3039
/**
40+
*
41+
* TODO: remove this
3142
* This will batch multiple calls waiting to perform a sync in a single call.
3243
*/
3344
final class PageSyncTimer extends ActiveMQScheduledComponent {
3445

46+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3547

48+
private final PagingStoreImpl store;
3649

37-
private final PagingStore store;
3850

39-
private final ScheduledExecutorService scheduledExecutor;
51+
protected final List<PageEvent> pageEvents = new ArrayList<>();
4052

41-
private boolean pendingSync;
53+
public boolean hasPendingIO() {
54+
return !pageEvents.isEmpty();
55+
}
4256

43-
private final long timeSync;
4457

45-
private final Runnable runnable = this::tick;
58+
public static class PageEvent {
59+
PageEvent(OperationContext context, PagedMessage message, Transaction tx, RouteContextList listCtx) {
60+
this.context = context;
61+
this.message = message;
62+
this.listCtx = listCtx;
63+
this.tx = tx;
64+
}
4665

47-
private final List<OperationContext> syncOperations = new LinkedList<>();
66+
PagedMessage message;
67+
OperationContext context;
68+
RouteContextList listCtx;
69+
Transaction tx;
70+
}
4871

4972

50-
PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
73+
PageSyncTimer(PagingStoreImpl store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
5174
super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
5275
this.store = store;
53-
this.scheduledExecutor = scheduledExecutor;
54-
this.timeSync = timeSync;
5576
}
5677

5778

58-
synchronized void addSync(OperationContext ctx) {
59-
ctx.pageSyncLineUp();
60-
if (!pendingSync) {
61-
pendingSync = true;
62-
63-
delay();
64-
}
65-
syncOperations.add(ctx);
79+
public synchronized void addTask(OperationContext context, PagedMessage message, Transaction tx, RouteContextList listCtx) {
80+
PageEvent event = new PageEvent(context, message, tx, listCtx);
81+
context.storeLineUp();
82+
this.pageEvents.add(event);
83+
delay();
6684
}
6785

86+
6887
@Override
6988
public void run() {
7089
tick();
7190
}
7291

73-
private void tick() {
74-
OperationContext[] pendingSyncsArray;
92+
private PageEvent[] extractPendingEvents() {
7593
synchronized (this) {
94+
if (pageEvents.isEmpty()) {
95+
return null;
96+
}
97+
PageEvent[] pendingsWrites = new PageEvent[pageEvents.size()];
98+
pendingsWrites = pageEvents.toArray(pendingsWrites);
99+
pageEvents.clear();
100+
return pendingsWrites;
101+
}
102+
}
76103

77-
pendingSync = false;
78-
pendingSyncsArray = new OperationContext[syncOperations.size()];
79-
pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
80-
syncOperations.clear();
104+
private void tick() {
105+
PageEvent[] pendingEvents = extractPendingEvents();
106+
if (pendingEvents == null) {
107+
return;
81108
}
82109

83110
try {
84-
if (pendingSyncsArray.length != 0) {
111+
112+
for (PageEvent event : pendingEvents) {
113+
store.writePageCallback(event.message, event.tx, event.listCtx);
114+
}
115+
116+
try {
85117
store.ioSync();
118+
} catch (Exception e) {
119+
// TODO - do not merge this TODO - critical error here, server should shutdown
120+
logger.warn(e.getMessage(), e);
86121
}
122+
123+
87124
} catch (Exception e) {
88-
for (OperationContext ctx : pendingSyncsArray) {
89-
ctx.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getClass() + " during ioSync for paging on " + store.getStoreName() + ": " + e.getMessage());
125+
for (PageEvent event : pendingEvents) {
126+
event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getClass() + " during ioSync for paging on " + store.getStoreName() + ": " + e.getMessage());
90127
}
91128
} finally {
92129
// In case of failure, The context should propagate an exception to the client
93130
// We send an exception to the client even on the case of a failure
94131
// to avoid possible locks and the client not getting the exception back
95-
for (OperationContext ctx : pendingSyncsArray) {
96-
ctx.pageSyncDone();
132+
for (PageEvent event : pendingEvents) {
133+
event.context.done();
97134
}
98135
}
99136
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public PageCursorProvider newCursorProvider(PagingStore store,
153153
@Override
154154
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
155155

156-
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
156+
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), syncNonTransactional);
157157
}
158158

159159
@Override
@@ -233,7 +233,7 @@ public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<
233233

234234
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
235235

236-
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
236+
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor().setFair(true), syncNonTransactional);
237237

238238
storesReturn.add(store);
239239
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public PageCursorProvider newCursorProvider(PagingStore store,
153153
@Override
154154
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
155155

156-
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
156+
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), syncNonTransactional);
157157
}
158158

159159
@Override
@@ -230,7 +230,7 @@ public List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettin
230230

231231
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
232232

233-
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
233+
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
234234

235235
storesReturn.add(store);
236236
}

0 commit comments

Comments
 (0)