Skip to content

Commit f2707da

Browse files
committed
Merge branch 'feature/fully-threaded-sched' into dev/1.21.5
2 parents 2324fe6 + 7e571ab commit f2707da

File tree

13 files changed

+234
-81
lines changed

13 files changed

+234
-81
lines changed

c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import net.minecraft.server.world.ChunkLevels;
77
import net.minecraft.util.math.ChunkPos;
88

9+
import java.util.ArrayDeque;
10+
import java.util.Queue;
911
import java.util.concurrent.ConcurrentHashMap;
1012
import java.util.concurrent.ConcurrentMap;
1113
import java.util.concurrent.Executor;
@@ -18,11 +20,21 @@ public class SchedulingManager {
1820

1921
public static final int MAX_LEVEL = ChunkLevels.INACCESSIBLE + 1;
2022
private final ConcurrentMap<Long, FreeableTaskList> pos2Tasks = new ConcurrentHashMap<>();
21-
private final Long2IntOpenHashMap prioritiesFromLevel = new Long2IntOpenHashMap();
23+
private final Long2IntOpenHashMap prioritiesFromLevel = new Long2IntOpenHashMap() {
24+
@Override
25+
protected void rehash(int newN) {
26+
if (n < newN) {
27+
super.rehash(newN);
28+
}
29+
}
30+
};
2231
private final StampedLock prioritiesLock = new StampedLock();
2332
private final int id = COUNTER.getAndIncrement();
2433
private volatile ChunkPos currentSyncLoad = null;
2534

35+
private boolean consolidatingLevelUpdates = false;
36+
private Queue<Runnable> consolidatedLevelUpdates = new ArrayDeque<>();
37+
2638
private final Executor executor;
2739

2840
{
@@ -73,21 +85,52 @@ public Executor positionedExecutor(long pos) {
7385

7486
public void updatePriorityFromLevel(long pos, int level) {
7587
this.executor.execute(() -> {
76-
if (this.getPriorityFromMap(pos) == level) return;
77-
final long stamp = this.prioritiesLock.writeLock();
78-
try {
79-
if (level < MAX_LEVEL) {
80-
this.prioritiesFromLevel.put(pos, level);
81-
} else {
82-
this.prioritiesFromLevel.remove(pos);
83-
}
84-
} finally {
85-
this.prioritiesLock.unlockWrite(stamp);
86-
}
87-
updatePriorityInternal(pos);
88+
updatePriorityFromLevel0(pos, level);
8889
});
8990
}
9091

92+
private void updatePriorityFromLevel0(long pos, int level) {
93+
if (this.getPriorityFromMap(pos) == level) return;
94+
final long stamp = this.prioritiesLock.writeLock();
95+
try {
96+
if (level < MAX_LEVEL) {
97+
this.prioritiesFromLevel.put(pos, level);
98+
} else {
99+
this.prioritiesFromLevel.remove(pos);
100+
}
101+
} finally {
102+
this.prioritiesLock.unlockWrite(stamp);
103+
}
104+
updatePriorityInternal(pos);
105+
}
106+
107+
public void updatePriorityFromLevelOnMain(long pos, int level) {
108+
if (this.consolidatingLevelUpdates) {
109+
this.consolidatedLevelUpdates.add(() -> updatePriorityFromLevel0(pos, level));
110+
} else {
111+
updatePriorityFromLevel(pos, level);
112+
}
113+
}
114+
115+
public void setConsolidatingLevelUpdates(boolean value) {
116+
this.consolidatingLevelUpdates = value;
117+
if (!value) {
118+
if (!this.consolidatedLevelUpdates.isEmpty()) {
119+
Queue<Runnable> runnables = this.consolidatedLevelUpdates;
120+
this.consolidatedLevelUpdates = new ArrayDeque<>();
121+
this.executor.execute(() -> {
122+
for (Runnable runnable : runnables) {
123+
try {
124+
runnable.run();
125+
} catch (Throwable t) {
126+
t.printStackTrace();
127+
}
128+
}
129+
});
130+
}
131+
}
132+
}
133+
91134
private void updatePriorityInternal(long pos) {
92135
final int priority = getPriority(pos);
93136
final FreeableTaskList locks = this.pos2Tasks.get(pos);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.ishland.c2me.base.mixin.scheduler;
2+
3+
import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager;
4+
import com.ishland.c2me.base.common.scheduler.SchedulingManager;
5+
import com.llamalad7.mixinextras.injector.wrapoperation.Operation;
6+
import com.llamalad7.mixinextras.injector.wrapoperation.WrapOperation;
7+
import net.minecraft.server.world.ChunkLevelManager;
8+
import net.minecraft.server.world.ServerChunkLoadingManager;
9+
import net.minecraft.server.world.ServerChunkManager;
10+
import org.spongepowered.asm.mixin.Final;
11+
import org.spongepowered.asm.mixin.Mixin;
12+
import org.spongepowered.asm.mixin.Shadow;
13+
import org.spongepowered.asm.mixin.injection.At;
14+
15+
@Mixin(ServerChunkManager.class)
16+
public class MixinServerChunkManager {
17+
18+
@Shadow @Final public ServerChunkLoadingManager chunkLoadingManager;
19+
20+
@WrapOperation(method = "updateChunks", at = @At(value = "INVOKE", target = "Lnet/minecraft/server/world/ChunkLevelManager;update(Lnet/minecraft/server/world/ServerChunkLoadingManager;)Z"))
21+
private boolean consolidatePriorityUpdates(ChunkLevelManager instance, ServerChunkLoadingManager chunkLoadingManager, Operation<Boolean> original) {
22+
SchedulingManager schedulingManager = ((IVanillaChunkManager) this.chunkLoadingManager).c2me$getSchedulingManager();
23+
schedulingManager.setConsolidatingLevelUpdates(true);
24+
try {
25+
return original.call(instance, chunkLoadingManager);
26+
} finally {
27+
schedulingManager.setConsolidatingLevelUpdates(false);
28+
}
29+
}
30+
31+
}

c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class MixinThreadedAnvilChunkStorage implements IVanillaChunkManager {
2222

2323
@Inject(method = "setLevel", at = @At("RETURN"))
2424
private void onUpdateLevel(long pos, int level, ChunkHolder holder, int i, CallbackInfoReturnable<ChunkHolder> cir) {
25-
this.c2me$schedulingManager.updatePriorityFromLevel(pos, level);
25+
this.c2me$schedulingManager.updatePriorityFromLevelOnMain(pos, level);
2626
}
2727

2828
}

c2me-base/src/main/resources/c2me-base.mixins.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
"access.fapi.IArrayBackedEvent",
5959
"instrumentation.MixinServerChunkManager",
6060
"report.MixinDedicatedServerWatchdog",
61+
"scheduler.MixinServerChunkManager",
6162
"scheduler.MixinThreadedAnvilChunkStorage",
6263
"theinterface.MixinStorageIoWorker",
6364
"util.log4j2shutdownhookisnomore.MixinMain",

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/Config.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ Whether to allow POIs (Point of Interest) to be unloaded
8080
""")
8181
.getBoolean(true, false);
8282

83-
public static final boolean lowMemoryMode = new ConfigSystem.ConfigAccessor()
84-
.key("chunkSystem.lowMemoryMode")
83+
public static final boolean useLegacyScheduling = new ConfigSystem.ConfigAccessor()
84+
.key("chunkSystem.useLegacyScheduling")
8585
.comment("""
86-
Whether to enable low memory mode
86+
Whether to use legacy scheduling for neighbor chunks
8787
88-
This will attempt to aggressively unload unneeded chunks, saving memory at the cost of additional
89-
overhead when generating new chunks.
88+
Enabling this restores the behavior of always loading in neighbor chunks when a chunk is loaded.
89+
90+
This is currently deprecated and will be removed in the future.
9091
""")
9192
.getBoolean(false, false);
9293

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/NewChunkHolderVanillaInterface.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void updateDeferredStatus(NewChunkStatus status) {
182182
}
183183

184184
private void triggerDeferredLoad(NewChunkStatus requestedStatus) {
185-
if (!Config.lowMemoryMode) return;
185+
if (Config.useLegacyScheduling) return;
186186
synchronized (this) {
187187
if (this.loadedDeferredStatus != null && this.loadedDeferredStatus.ordinal() >= requestedStatus.ordinal()) {
188188
return; // nothing to do

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/TheChunkSystem.java

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,55 @@
11
package com.ishland.c2me.rewrites.chunksystem.common;
22

3-
import com.ishland.c2me.base.common.GlobalExecutors;
43
import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager;
54
import com.ishland.c2me.base.common.scheduler.SchedulingManager;
65
import com.ishland.c2me.base.mixin.access.IThreadedAnvilChunkStorage;
76
import com.ishland.c2me.base.mixin.access.IVersionedChunkStorage;
8-
import com.ishland.flowsched.scheduler.DaemonizedStatusAdvancingScheduler;
7+
import com.ishland.c2me.rewrites.chunksystem.common.structs.ChunkSystemExecutors;
98
import com.ishland.flowsched.scheduler.ExceptionHandlingAction;
109
import com.ishland.flowsched.scheduler.ItemHolder;
1110
import com.ishland.flowsched.scheduler.ItemStatus;
1211
import com.ishland.flowsched.scheduler.KeyStatusPair;
12+
import com.ishland.flowsched.scheduler.StatusAdvancingScheduler;
1313
import com.ishland.flowsched.util.Assertions;
14-
import io.netty.util.internal.PlatformDependent;
1514
import io.reactivex.rxjava3.core.Scheduler;
16-
import io.reactivex.rxjava3.schedulers.Schedulers;
1715
import it.unimi.dsi.fastutil.longs.Long2IntMap;
1816
import it.unimi.dsi.fastutil.longs.Long2IntMaps;
1917
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
2018
import net.minecraft.server.MinecraftServer;
2119
import net.minecraft.server.WorldGenerationProgressListener;
2220
import net.minecraft.server.world.ChunkHolder;
2321
import net.minecraft.server.world.ServerChunkLoadingManager;
24-
import net.minecraft.util.Util;
2522
import net.minecraft.util.collection.BoundedRegionArray;
26-
import net.minecraft.util.crash.CrashException;
27-
import net.minecraft.util.crash.ReportType;
2823
import net.minecraft.util.math.ChunkPos;
2924
import org.slf4j.Logger;
3025
import org.slf4j.LoggerFactory;
3126

32-
import java.nio.file.Path;
33-
import java.util.Queue;
3427
import java.util.concurrent.Executor;
35-
import java.util.concurrent.ThreadFactory;
3628

37-
public class TheChunkSystem extends DaemonizedStatusAdvancingScheduler<ChunkPos, ChunkState, ChunkLoadingContext, NewChunkHolderVanillaInterface> {
29+
public class TheChunkSystem extends StatusAdvancingScheduler<ChunkPos, ChunkState, ChunkLoadingContext, NewChunkHolderVanillaInterface> {
3830

3931
private final Logger LOGGER;
4032

4133
private final Long2IntMap managedTickets = Long2IntMaps.synchronize(new Long2IntOpenHashMap());
4234
private final SchedulingManager schedulingManager;
43-
private final Executor backingBackgroundExecutor = GlobalExecutors.prioritizedScheduler.executor(15);
44-
private Queue<Runnable> backgroundTaskQueue = PlatformDependent.newSpscQueue();
45-
private final Executor backgroundExecutor = command -> {
46-
if (Thread.currentThread() != this.thread) {
47-
command.run();
48-
} else {
49-
backgroundTaskQueue.add(command);
50-
}
51-
};
52-
private final Scheduler backgroundScheduler = Schedulers.from(this.backgroundExecutor);
5335
private final ServerChunkLoadingManager tacs;
5436

55-
public TheChunkSystem(ThreadFactory threadFactory, ServerChunkLoadingManager tacs) {
56-
super(threadFactory, TheSpeedyObjectFactory.INSTANCE);
37+
public TheChunkSystem(ServerChunkLoadingManager tacs) {
38+
super(TheSpeedyObjectFactory.INSTANCE);
5739
this.tacs = tacs;
5840
this.schedulingManager = ((IVanillaChunkManager) tacs).c2me$getSchedulingManager();
5941
this.LOGGER = LoggerFactory.getLogger("Chunk System of %s".formatted(((IThreadedAnvilChunkStorage) tacs).getWorld().getRegistryKey().getValue()));
6042
managedTickets.defaultReturnValue(NewChunkStatus.vanillaLevelToStatus.length - 1);
61-
this.thread.start();
6243
}
6344

6445
@Override
6546
protected Executor getBackgroundExecutor() {
66-
return this.backgroundExecutor;
47+
return ChunkSystemExecutors.consolidatingBackgroundExecutor;
6748
}
6849

6950
@Override
7051
protected Scheduler getSchedulerBackedByBackgroundExecutor() {
71-
return this.backgroundScheduler;
52+
return ChunkSystemExecutors.consolidatingBackgroundScheduler;
7253
}
7354

7455
@Override
@@ -149,7 +130,7 @@ protected void onItemDowngrade(ItemHolder<ChunkPos, ChunkState, ChunkLoadingCont
149130
}
150131

151132
public ChunkHolder vanillaIf$setLevel(long pos, int level) {
152-
Assertions.assertTrue(!Thread.holdsLock(this.managedTickets));
133+
assert !Thread.holdsLock(this.managedTickets);
153134
synchronized (this.managedTickets) {
154135
final int oldLevel = this.managedTickets.put(pos, level);
155136
NewChunkStatus oldStatus = c2me$getDeferredStatusFromVanillaLevel(oldLevel);
@@ -170,7 +151,7 @@ protected void onItemDowngrade(ItemHolder<ChunkPos, ChunkState, ChunkLoadingCont
170151
Assertions.assertTrue(holder != null, "Holder should be managed by the vanilla interface");
171152
assert holder != null;
172153
vanillaHolder = holder.getUserData().get();
173-
if (Config.lowMemoryMode) {
154+
if (!Config.useLegacyScheduling) {
174155
vanillaHolder.updateDeferredStatus(NewChunkStatus.fromVanillaLevel(level));
175156
}
176157

@@ -187,7 +168,7 @@ protected void onItemDowngrade(ItemHolder<ChunkPos, ChunkState, ChunkLoadingCont
187168
} else {
188169
vanillaHolder = null;
189170
}
190-
if (Config.lowMemoryMode && vanillaHolder != null) {
171+
if (!Config.useLegacyScheduling && vanillaHolder != null) {
191172
vanillaHolder.updateDeferredStatus(NewChunkStatus.fromVanillaLevel(level));
192173
}
193174
if (newStatus != this.getUnloadedStatus() && vanillaHolder != null) {
@@ -200,7 +181,7 @@ protected void onItemDowngrade(ItemHolder<ChunkPos, ChunkState, ChunkLoadingCont
200181

201182
private static NewChunkStatus c2me$getDeferredStatusFromVanillaLevel(int level) {
202183
NewChunkStatus status = NewChunkStatus.fromVanillaLevel(level);
203-
if (Config.lowMemoryMode) {
184+
if (!Config.useLegacyScheduling) {
204185
if (status == NewChunkStatus.NEW) {
205186
return status;
206187
} else if (status.ordinal() < NewChunkStatus.SERVER_ACCESSIBLE.ordinal()) {
@@ -216,24 +197,4 @@ protected void onItemDowngrade(ItemHolder<ChunkPos, ChunkState, ChunkLoadingCont
216197
public int vanillaIf$getManagedLevel(long pos) {
217198
return this.managedTickets.get(pos);
218199
}
219-
220-
@Override
221-
public boolean tick() {
222-
boolean tick = super.tick();
223-
if (!this.backgroundTaskQueue.isEmpty()) {
224-
Queue<Runnable> queue = this.backgroundTaskQueue;
225-
this.backgroundTaskQueue = PlatformDependent.newSpscQueue();
226-
this.backingBackgroundExecutor.execute(() -> {
227-
Runnable runnable;
228-
while ((runnable = queue.poll()) != null) {
229-
try {
230-
runnable.run();
231-
} catch (Throwable t) {
232-
t.printStackTrace();
233-
}
234-
}
235-
});
236-
}
237-
return tick;
238-
}
239200
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.ishland.c2me.rewrites.chunksystem.common.structs;
2+
3+
import com.ishland.c2me.base.common.GlobalExecutors;
4+
import io.reactivex.rxjava3.core.Scheduler;
5+
import io.reactivex.rxjava3.schedulers.Schedulers;
6+
7+
import java.util.ArrayDeque;
8+
import java.util.Queue;
9+
import java.util.concurrent.Executor;
10+
11+
public class ChunkSystemExecutors {
12+
13+
public static final ThreadLocal<Queue<Runnable>> CONSOLIDATING_QUEUE = new ThreadLocal<>();
14+
15+
public static final Executor backingBackgroundExecutor = GlobalExecutors.prioritizedScheduler.executor(15);
16+
public static final Scheduler backgroundScheduler = Schedulers.from(backingBackgroundExecutor);
17+
public static final Executor consolidatingBackgroundExecutor = command -> {
18+
Queue<Runnable> runnables = CONSOLIDATING_QUEUE.get();
19+
if (runnables == null) { // first entry
20+
consolidatingRoot(command);
21+
CONSOLIDATING_QUEUE.remove(); // get() initielizes threadlocal
22+
return;
23+
}
24+
runnables.add(command);
25+
};
26+
public static final Scheduler consolidatingBackgroundScheduler = Schedulers.from(consolidatingBackgroundExecutor);
27+
28+
private static void consolidatingRoot(Runnable initialCommand) {
29+
backingBackgroundExecutor.execute(() -> {
30+
Queue<Runnable> runnables = CONSOLIDATING_QUEUE.get();
31+
if (runnables != null) {
32+
new Throwable("CONSOLIDATING_QUEUE leak").printStackTrace();
33+
try {
34+
initialCommand.run();
35+
} catch (Throwable t) {
36+
t.printStackTrace();
37+
}
38+
return;
39+
}
40+
41+
CONSOLIDATING_QUEUE.set(runnables = new ArrayDeque<>());
42+
runnables.add(initialCommand);
43+
try {
44+
while (!runnables.isEmpty()) {
45+
try {
46+
runnables.remove().run();
47+
} catch (Throwable t) {
48+
t.printStackTrace();
49+
}
50+
}
51+
} finally {
52+
if (!runnables.isEmpty()) {
53+
new Throwable("runnable leak").printStackTrace();
54+
}
55+
CONSOLIDATING_QUEUE.remove();
56+
}
57+
});
58+
}
59+
60+
}

0 commit comments

Comments
 (0)