Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 27, 2025
1 parent 0330132 commit f2c870d
Show file tree
Hide file tree
Showing 20 changed files with 474 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.concurrent.Executor;

public class Actor<T> extends ProcessorBase<T> {
public class Actor<T> extends TaskProcessorBase<T> {

private final ActorListener<T> listener;

Expand All @@ -33,7 +33,7 @@ protected final void doTask(T task) {
}

public final void act(T message) {
task(message);
addTask(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.utils.actors;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;

/**
* this is for any ProcessorBase that should empty the queue, and execute things at once.
* Example: a processorBase that should get all pending tasks, write them all on storage and issue a single storage sync at the end.
*/
public abstract class GroupingTasksProcessorBase<T> extends ProcessorBase<T> {

public GroupingTasksProcessorBase(Executor parent) {
super(parent);
}

@Override
protected void processQueue() {
T task;
List<T> pendingTasks = null;
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
if (pendingTasks == null) {
pendingTasks = new ArrayList<>(tasks.size() + 1);
}
pendingTasks.add(task);
}
if (pendingTasks != null) {
doTasks(pendingTasks);
}
}

protected abstract void doTasks(List<T> tasks);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
* same method, will result in B's task running after A's.
*/
public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
public class OrderedExecutor extends TaskProcessorBase<Runnable> implements ArtemisExecutor {

public OrderedExecutor(Executor delegate) {
super(delegate);
Expand Down Expand Up @@ -69,7 +69,7 @@ protected final void doTask(Runnable task) {

@Override
public final void execute(Runnable run) {
task(run);
addTask(run);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public abstract class ProcessorBase<T> extends HandlerBase {
@SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING;
// Request of forced shutdown
private volatile boolean requestedForcedShutdown = false;
protected volatile boolean requestedForcedShutdown = false;
// Request of educated shutdown:
protected volatile boolean requestedShutdown = false;
// Request to yield to another thread
private volatile boolean yielded = false;
protected volatile boolean yielded = false;

private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");

Expand All @@ -61,12 +61,7 @@ private void executePendingTasks() {
if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
enter();
try {
T task;
//while the queue is not empty we process in order:
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
doTask(task);
}
processQueue();
} finally {
leave();
//set state back to not running if possible: shutdownNow could be called by doTask(task).
Expand All @@ -90,6 +85,8 @@ private void executePendingTasks() {
}
}

protected abstract void processQueue();

/**
* It will shutdown and wait 30 seconds for timeout.
*/
Expand Down Expand Up @@ -135,8 +132,6 @@ public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit
return pendingItems;
}

protected abstract void doTask(T task);

public ProcessorBase(Executor parent) {
this.delegate = parent;
}
Expand Down Expand Up @@ -174,7 +169,7 @@ public final boolean flush(long timeout, TimeUnit unit) {
return this.state == STATE_NOT_RUNNING;
}

protected void task(T command) {
protected void addTask(T command) {
if (requestedShutdown) {
logAddOnShutdown();
return;
Expand All @@ -190,7 +185,7 @@ protected void task(T command) {

/**
* This has to be called on the assumption that state!=STATE_RUNNING.
* It is packed separately from {@link #task(Object)} just for performance reasons: it
* It is packed separately from {@link #addTask(Object)} just for performance reasons: it
* handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
*/
private void onAddedTaskIfNotRunning(int state) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.utils.actors;

import java.util.concurrent.Executor;

/**
* this is for any ProcessorBase that will execute one item at a time.
* processQueue will take one element each time and call doTask.
*/
public abstract class TaskProcessorBase<T> extends ProcessorBase<T> {

public TaskProcessorBase(Executor parent) {
super(parent);
}

@Override
protected void processQueue() {
T task;
//while the queue is not empty we process in order:
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
doTask(task);
}
}

protected abstract void doTask(T task);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

public class ThresholdActor<T> extends ProcessorBase<Object> {
public class ThresholdActor<T> extends TaskProcessorBase<Object> {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down Expand Up @@ -85,13 +85,13 @@ public void act(T message) {
} else {
logger.debug("element {} returned an invalid size over the Actor", message);
}
task(message);
addTask(message);
}

public void flush() {
if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) {
overThreshold.run();
task(FLUSH);
addTask(FLUSH);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.actors;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class GroupedExecutorTest {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

class GroupedStrings extends GroupingTasksProcessorBase<String> {

Runnable left;

public GroupedStrings(Executor parent, Runnable left) {
super(parent);
this.left = left;
}

List<String> lastExecution = null;

@Override
protected void doTasks(List<String> tasks) {
lastExecution = tasks;
}

final Semaphore semaphore = new Semaphore(0);

@Override
protected void enter() {
super.enter();
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
protected void leave() {
super.leave();
left.run();
}

public void release() {
semaphore.release();
}

public void groupString(String value) {
addTask(value);
}
}

@Test
public void groupStrings() throws InterruptedException {
final int strings = 1000;
final ExecutorService executorService = Executors.newFixedThreadPool(1);
try {
ReusableLatch latchLeft = new ReusableLatch(1);
final GroupedStrings groupedStrings = new GroupedStrings(executorService, latchLeft::countDown);

for (int i = 0; i < strings; i++) {
groupedStrings.groupString("String " + i);
}
groupedStrings.release();
assertTrue(latchLeft.await(10, TimeUnit.SECONDS));
assertNotNull(groupedStrings.lastExecution);
assertEquals(strings, groupedStrings.lastExecution.size());
for (int i = 0; i < strings; i++) {
assertEquals("String " + i, groupedStrings.lastExecution.get(i));
}
latchLeft.setCount(1);
groupedStrings.release();
assertTrue(latchLeft.await(10, TimeUnit.SECONDS));
assertNotNull(groupedStrings.lastExecution);
// no executions, should still be the last one
assertEquals(strings, groupedStrings.lastExecution.size());
} finally {
executorService.shutdownNow();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
boolean isPaging();

/**
* Schedules sync to the file storage.
*/
void addSyncPoint(OperationContext context) throws Exception;

/**
* Performs a real sync on the current IO file.
*/
Expand Down Expand Up @@ -265,6 +260,10 @@ default void addSize(int size) {

void unblock();

default boolean hasPendingIO() {
return false;
}

default StorageManager getStorageManager() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ public Future<Boolean> scheduleCleanup() {

return future;
}
public Future<Boolean> forceScheduleCleanup() {
final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();

pagingStore.execute(() -> {
storageManager.setContext(storageManager.newSingleThreadContext());
try {
System.out.println("Forcing cleanup");
if (cleanupEnabled) {
cleanup();
}
} finally {
storageManager.clearContext();
future.set(true);
}
});

return future;
}


/**
* Delete everything associated with any queue on this address.
Expand Down Expand Up @@ -271,6 +290,11 @@ protected void cleanup() {
return;
}

if (pagingStore.hasPendingIO()) {
forceScheduleCleanup();
return;
}

ArrayList<Page> depagedPages = new ArrayList<>();
LongHashSet depagedPagesSet = new LongHashSet();

Expand Down
Loading

0 comments on commit f2c870d

Please sign in to comment.