Skip to content

Commit d67c08b

Browse files
fixes
1 parent b1ba6cc commit d67c08b

File tree

16 files changed

+453
-113
lines changed

16 files changed

+453
-113
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.util.concurrent.Executor;
2020

21-
public class Actor<T> extends ProcessorBase<T> {
21+
public class Actor<T> extends TaskProcessorBase<T> {
2222

2323
private final ActorListener<T> listener;
2424

@@ -33,7 +33,7 @@ protected final void doTask(T task) {
3333
}
3434

3535
public final void act(T message) {
36-
task(message);
36+
addTask(message);
3737
}
3838

3939
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.utils.actors;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.Executor;
23+
24+
/**
25+
* this is for any ProcessorBase that should empty the queue, and execute things at once.
26+
* Example: a processorBase that should get all pending tasks, write them all on storage and issue a single storage sync at the end.
27+
*/
28+
public abstract class GroupingTasksProcessorBase<T> extends ProcessorBase<T> {
29+
30+
public GroupingTasksProcessorBase(Executor parent) {
31+
super(parent);
32+
}
33+
34+
@Override
35+
protected void processQueue() {
36+
T task;
37+
List<T> pendingTasks = null;
38+
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
39+
if (pendingTasks == null) {
40+
pendingTasks = new ArrayList<>(tasks.size() + 1);
41+
}
42+
pendingTasks.add(task);
43+
}
44+
if (pendingTasks != null) {
45+
doTasks(pendingTasks);
46+
}
47+
}
48+
49+
protected abstract void doTasks(List<T> tasks);
50+
51+
}

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
3030
* same method, will result in B's task running after A's.
3131
*/
32-
public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
32+
public class OrderedExecutor extends TaskProcessorBase<Runnable> implements ArtemisExecutor {
3333

3434
public OrderedExecutor(Executor delegate) {
3535
super(delegate);
@@ -69,7 +69,7 @@ protected final void doTask(Runnable task) {
6969

7070
@Override
7171
public final void execute(Runnable run) {
72-
task(run);
72+
addTask(run);
7373
}
7474

7575
@Override

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public abstract class ProcessorBase<T> extends HandlerBase {
4747
@SuppressWarnings("unused")
4848
private volatile int state = STATE_NOT_RUNNING;
4949
// Request of forced shutdown
50-
private volatile boolean requestedForcedShutdown = false;
50+
protected volatile boolean requestedForcedShutdown = false;
5151
// Request of educated shutdown:
5252
protected volatile boolean requestedShutdown = false;
5353
// Request to yield to another thread
54-
private volatile boolean yielded = false;
54+
protected volatile boolean yielded = false;
5555

5656
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
5757

@@ -61,12 +61,7 @@ private void executePendingTasks() {
6161
if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
6262
enter();
6363
try {
64-
T task;
65-
//while the queue is not empty we process in order:
66-
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
67-
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
68-
doTask(task);
69-
}
64+
processQueue();
7065
} finally {
7166
leave();
7267
//set state back to not running if possible: shutdownNow could be called by doTask(task).
@@ -90,6 +85,8 @@ private void executePendingTasks() {
9085
}
9186
}
9287

88+
protected abstract void processQueue();
89+
9390
/**
9491
* It will shutdown and wait 30 seconds for timeout.
9592
*/
@@ -135,8 +132,6 @@ public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit
135132
return pendingItems;
136133
}
137134

138-
protected abstract void doTask(T task);
139-
140135
public ProcessorBase(Executor parent) {
141136
this.delegate = parent;
142137
}
@@ -174,7 +169,7 @@ public final boolean flush(long timeout, TimeUnit unit) {
174169
return this.state == STATE_NOT_RUNNING;
175170
}
176171

177-
protected void task(T command) {
172+
protected void addTask(T command) {
178173
if (requestedShutdown) {
179174
logAddOnShutdown();
180175
return;
@@ -190,7 +185,7 @@ protected void task(T command) {
190185

191186
/**
192187
* This has to be called on the assumption that state!=STATE_RUNNING.
193-
* It is packed separately from {@link #task(Object)} just for performance reasons: it
188+
* It is packed separately from {@link #addTask(Object)} just for performance reasons: it
194189
* handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
195190
*/
196191
private void onAddedTaskIfNotRunning(int state) {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.utils.actors;
19+
20+
import java.util.concurrent.Executor;
21+
22+
/**
23+
* this is for any ProcessorBase that will execute one item at a time.
24+
* processQueue will take one element each time and call doTask.
25+
*/
26+
public abstract class TaskProcessorBase<T> extends ProcessorBase<T> {
27+
28+
public TaskProcessorBase(Executor parent) {
29+
super(parent);
30+
}
31+
32+
@Override
33+
protected void processQueue() {
34+
T task;
35+
//while the queue is not empty we process in order:
36+
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
37+
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
38+
doTask(task);
39+
}
40+
}
41+
42+
protected abstract void doTask(T task);
43+
44+
}

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525
import java.lang.invoke.MethodHandles;
2626

27-
public class ThresholdActor<T> extends ProcessorBase<Object> {
27+
public class ThresholdActor<T> extends TaskProcessorBase<Object> {
2828

2929
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3030

@@ -85,13 +85,13 @@ public void act(T message) {
8585
} else {
8686
logger.debug("element {} returned an invalid size over the Actor", message);
8787
}
88-
task(message);
88+
addTask(message);
8989
}
9090

9191
public void flush() {
9292
if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) {
9393
overThreshold.run();
94-
task(FLUSH);
94+
addTask(FLUSH);
9595
}
9696
}
9797

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.utils.actors;
18+
19+
import java.lang.invoke.MethodHandles;
20+
import java.util.List;
21+
import java.util.concurrent.Executor;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.Semaphore;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.apache.activemq.artemis.utils.ReusableLatch;
28+
import org.junit.jupiter.api.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import static org.junit.jupiter.api.Assertions.assertEquals;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
36+
public class GroupedExecutorTest {
37+
38+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
39+
40+
class GroupedStrings extends GroupingTasksProcessorBase<String> {
41+
42+
Runnable left;
43+
44+
public GroupedStrings(Executor parent, Runnable left) {
45+
super(parent);
46+
this.left = left;
47+
}
48+
49+
List<String> lastExecution = null;
50+
51+
@Override
52+
protected void doTasks(List<String> tasks) {
53+
lastExecution = tasks;
54+
}
55+
56+
final Semaphore semaphore = new Semaphore(0);
57+
58+
@Override
59+
protected void enter() {
60+
super.enter();
61+
try {
62+
semaphore.acquire();
63+
} catch (InterruptedException e) {
64+
throw new RuntimeException(e.getMessage(), e);
65+
}
66+
}
67+
68+
@Override
69+
protected void leave() {
70+
super.leave();
71+
left.run();
72+
}
73+
74+
public void release() {
75+
semaphore.release();
76+
}
77+
78+
public void groupString(String value) {
79+
addTask(value);
80+
}
81+
}
82+
83+
@Test
84+
public void groupStrings() throws InterruptedException {
85+
final int strings = 1000;
86+
final ExecutorService executorService = Executors.newFixedThreadPool(1);
87+
try {
88+
ReusableLatch latchLeft = new ReusableLatch(1);
89+
final GroupedStrings groupedStrings = new GroupedStrings(executorService, latchLeft::countDown);
90+
91+
for (int i = 0; i < strings; i++) {
92+
groupedStrings.groupString("String " + i);
93+
}
94+
groupedStrings.release();
95+
assertTrue(latchLeft.await(10, TimeUnit.SECONDS));
96+
assertNotNull(groupedStrings.lastExecution);
97+
assertEquals(strings, groupedStrings.lastExecution.size());
98+
for (int i = 0; i < strings; i++) {
99+
assertEquals("String " + i, groupedStrings.lastExecution.get(i));
100+
}
101+
latchLeft.setCount(1);
102+
groupedStrings.release();
103+
assertTrue(latchLeft.await(10, TimeUnit.SECONDS));
104+
assertNotNull(groupedStrings.lastExecution);
105+
// no executions, should still be the last one
106+
assertEquals(strings, groupedStrings.lastExecution.size());
107+
} finally {
108+
executorService.shutdownNow();
109+
}
110+
}
111+
112+
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
114114
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
115115
boolean isPaging();
116116

117-
/**
118-
* Schedules sync to the file storage.
119-
*/
120-
void addSyncPoint(OperationContext context) throws Exception;
121-
122117
/**
123118
* Performs a real sync on the current IO file.
124119
*/
@@ -265,6 +260,10 @@ default void addSize(int size) {
265260

266261
void unblock();
267262

263+
default boolean hasPendingIO() {
264+
return false;
265+
}
266+
268267
default StorageManager getStorageManager() {
269268
return null;
270269
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,25 @@ public Future<Boolean> scheduleCleanup() {
209209

210210
return future;
211211
}
212+
public Future<Boolean> forceScheduleCleanup() {
213+
final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
214+
215+
pagingStore.execute(() -> {
216+
storageManager.setContext(storageManager.newSingleThreadContext());
217+
try {
218+
System.out.println("Forcing cleanup");
219+
if (cleanupEnabled) {
220+
cleanup();
221+
}
222+
} finally {
223+
storageManager.clearContext();
224+
future.set(true);
225+
}
226+
});
227+
228+
return future;
229+
}
230+
212231

213232
/**
214233
* Delete everything associated with any queue on this address.
@@ -271,6 +290,11 @@ protected void cleanup() {
271290
return;
272291
}
273292

293+
if (pagingStore.hasPendingIO()) {
294+
forceScheduleCleanup();
295+
return;
296+
}
297+
274298
ArrayList<Page> depagedPages = new ArrayList<>();
275299
LongHashSet depagedPagesSet = new LongHashSet();
276300

0 commit comments

Comments
 (0)