Skip to content

Commit beb5324

Browse files
committed
fix: futures sometimes being wrongly tagged as unloaded
1 parent fb11fcb commit beb5324

File tree

3 files changed

+72
-18
lines changed

3 files changed

+72
-18
lines changed

src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,19 @@ public boolean setStatus(ItemStatus<K, V, Ctx> status, boolean isCancellation) {
263263
// }
264264

265265
this.status = status;
266-
flushUnloadedStatus(prevStatus, true);
266+
267+
// reinit higher futures because downgraded, and deinit futures higher than target status
268+
synchronized (this.futures) {
269+
final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
270+
for (int i = prevStatus.ordinal(); i < this.futures.length; i ++) {
271+
if (i > targetStatus.ordinal()) {
272+
this.futures[i].completeExceptionally(UNLOADED_EXCEPTION);
273+
this.futures[i] = UNLOADED_FUTURE;
274+
} else {
275+
this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i];
276+
}
277+
}
278+
}
267279
} else if (compare > 0) { // status upgrade
268280
Assertions.assertTrue(prevStatus.getNext() == status, "Invalid status upgrade");
269281

@@ -292,19 +304,17 @@ public boolean setStatus(ItemStatus<K, V, Ctx> status, boolean isCancellation) {
292304
return true;
293305
}
294306

295-
void flushUnloadedStatus(ItemStatus<K, V, Ctx> startingPoint, boolean onDowngrade) {
307+
void flushUnloadedStatus(ItemStatus<K, V, Ctx> currentStatus) {
296308
ArrayList<CompletableFuture<Void>> futuresToFire = null;
309+
if (currentStatus.getNext() == null) {
310+
return;
311+
}
297312
synchronized (this.futures) {
298-
final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
299-
for (int i = startingPoint.ordinal(); i < this.futures.length; i ++) {
300-
if (i > targetStatus.ordinal()) {
301-
if (futuresToFire == null) futuresToFire = new ArrayList<>();
302-
CompletableFuture<Void> oldFuture = this.futures[i];
303-
futuresToFire.add(oldFuture);
304-
this.futures[i] = UNLOADED_FUTURE;
305-
} else if (onDowngrade) {
306-
this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i];
307-
}
313+
for (int i = currentStatus.ordinal() + 1; i < this.futures.length; i ++) {
314+
if (futuresToFire == null) futuresToFire = new ArrayList<>();
315+
CompletableFuture<Void> oldFuture = this.futures[i];
316+
futuresToFire.add(oldFuture);
317+
this.futures[i] = UNLOADED_FUTURE;
308318
}
309319
}
310320
if (futuresToFire != null) {
@@ -315,6 +325,16 @@ void flushUnloadedStatus(ItemStatus<K, V, Ctx> startingPoint, boolean onDowngrad
315325
}
316326
}
317327

328+
void validateCompletedFutures(ItemStatus<K, V, Ctx> upTo) {
329+
synchronized (this.futures) {
330+
for (int i = this.unloadedStatus.ordinal() + 1; i <= upTo.ordinal(); i++) {
331+
CompletableFuture<Void> future = this.futures[i];
332+
Assertions.assertTrue(future != UNLOADED_FUTURE, "Future for loaded status cannot be UNLOADED_FUTURE");
333+
Assertions.assertTrue(future.isDone(), "Future for status <= targetStatus must be completed");
334+
}
335+
}
336+
}
337+
318338
public synchronized void setDependencies(ItemStatus<K, V, Ctx> status, KeyStatusPair<K, V, Ctx>[] dependencies) {
319339
assertOpen();
320340
final int ordinal = status.ordinal();

src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,11 @@ void tickHolder0(ItemHolder<K, V, Ctx, UserData> holder) {
109109
current = holder.getStatus();
110110
nextStatus = getNextStatus(current, holder.getTargetStatus());
111111
Assertions.assertTrue(holder.getStatus() == current);
112+
holder.validateCompletedFutures(current);
112113
// holder.sanitizeSetStatus = null;
113114
if (nextStatus == current) {
114-
holder.flushUnloadedStatus(current, false);
115+
holder.flushUnloadedStatus(current);
116+
holder.validateCompletedFutures(current);
115117
if (current.equals(getUnloadedStatus())) {
116118
if (holder.isDependencyDirty()) {
117119
holder.executeCriticalSectionAndBusy(() -> holder.cleanupDependencies(this));

src/test/java/com/ishland/flowsched/scheduler/SchedulerTest.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package com.ishland.flowsched.scheduler;
22

3+
import com.ishland.flowsched.scheduler.support.TestContext;
4+
import com.ishland.flowsched.scheduler.support.TestItem;
35
import com.ishland.flowsched.scheduler.support.TestSchedulerImpl;
46
import com.ishland.flowsched.scheduler.support.TestStatus;
7+
import org.junit.jupiter.api.Assertions;
58
import org.junit.jupiter.api.Test;
69
import org.junit.jupiter.api.extension.ExtendWith;
710
import org.mockito.junit.jupiter.MockitoExtension;
811
import org.mockito.junit.jupiter.MockitoSettings;
912
import org.mockito.quality.Strictness;
1013

14+
import java.util.ArrayList;
15+
import java.util.Random;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.atomic.AtomicBoolean;
1118
import java.util.concurrent.locks.LockSupport;
1219

1320
@ExtendWith(MockitoExtension.class)
@@ -18,11 +25,27 @@ public class SchedulerTest {
1825
public void testSimple() {
1926
final TestSchedulerImpl scheduler = new TestSchedulerImpl();
2027
long startTime = System.nanoTime();
21-
// scheduler.addTicket(100L, TestStatus.STATE_3, () -> {
22-
// System.out.println("100 reached STATE_3 after " + (System.nanoTime() - startTime) + "ns");
23-
// scheduler.removeTicket(100L, TestStatus.STATE_3);
24-
// });
25-
final long key = 1000L;
28+
29+
final long key = 1024L;
30+
AtomicBoolean spamLoaderRunning = new AtomicBoolean(true);
31+
ArrayList<CompletableFuture<Void>> spammedFutures = new ArrayList<>();
32+
33+
Thread spamLoader = new Thread(() -> {
34+
Random random = new Random();
35+
while (spamLoaderRunning.get()) {
36+
long victim = random.nextLong(key - 1);
37+
ItemHolder<Long, TestItem, TestContext, Void> holder = scheduler.addTicket(victim, TestStatus.STATE_8, StatusAdvancingScheduler.NO_OP);
38+
CompletableFuture<Void> future = holder.getFutureForStatus0(TestStatus.STATE_8);
39+
if (future.isCompletedExceptionally()) {
40+
Assertions.fail();
41+
}
42+
spammedFutures.add(future);
43+
scheduler.removeTicket(victim, TestStatus.STATE_8);
44+
LockSupport.parkNanos(1_000L);
45+
}
46+
});
47+
spamLoader.start();
48+
2649
scheduler.addTicket(key, TestStatus.STATE_7, () -> {
2750
System.out.println("reached STATE_7 after " + (System.nanoTime() - startTime) + "ns");
2851
scheduler.removeTicket(key, TestStatus.STATE_7);
@@ -33,6 +56,7 @@ public void testSimple() {
3356
scheduler.addTicket(key, TestStatus.STATE_8, () -> {
3457
System.out.println("reached STATE_8 after " + (System.nanoTime() - startTime) + "ns");
3558
scheduler.removeTicket(key, TestStatus.STATE_8);
59+
spamLoaderRunning.set(false);
3660
});
3761
scheduler.getHolder(key).getFutureForStatus0(TestStatus.STATE_8).whenComplete((unused, throwable) -> {
3862
if (throwable != null) throwable.printStackTrace();
@@ -62,6 +86,14 @@ public void testSimple() {
6286
}
6387

6488
System.out.println("All unloaded after " + (System.nanoTime() - startTime) + "ns");
89+
90+
for (CompletableFuture<Void> spammedFuture : spammedFutures) {
91+
if (!spammedFuture.isDone()) {
92+
Assertions.fail();
93+
}
94+
}
95+
96+
6597
// scheduler.shutdown();
6698
}
6799

0 commit comments

Comments
 (0)