Skip to content

Commit b83c97f

Browse files
committed
refactor: replace CompletionStage with Completable in status
1 parent a9c2ab7 commit b83c97f

File tree

11 files changed

+200
-153
lines changed

11 files changed

+200
-153
lines changed

c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/mixin/MixinServerAccessibleChunkSending.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import com.ishland.flowsched.scheduler.Cancellable;
1414
import com.ishland.flowsched.scheduler.ItemHolder;
1515
import com.ishland.flowsched.scheduler.KeyStatusPair;
16+
import com.ishland.flowsched.util.Assertions;
17+
import io.reactivex.rxjava3.core.Completable;
18+
import io.reactivex.rxjava3.schedulers.Schedulers;
1619
import it.unimi.dsi.fastutil.shorts.ShortList;
1720
import it.unimi.dsi.fastutil.shorts.ShortListIterator;
1821
import net.minecraft.block.Block;
@@ -70,7 +73,7 @@ private static void onCLInit(CallbackInfo ci) {
7073
* @reason do chunk sending
7174
*/
7275
@Overwrite(remap = false)
73-
public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
76+
public Completable upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
7477
ArrayList<BlockPos> blocksToRemove = new ArrayList<>();
7578
if (Config.suppressGhostMushrooms) {
7679
ServerWorld serverWorld = ((IThreadedAnvilChunkStorage) context.tacs()).getWorld();
@@ -97,22 +100,26 @@ public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancella
97100
}
98101
}
99102
}
100-
return CompletableFuture.runAsync(() -> {
101-
try (var ignored = ThreadInstrumentation.getCurrent().begin(new ChunkTaskWork(context, (ServerAccessibleChunkSending) (Object) this, true))) {
102-
if (Config.suppressGhostMushrooms) {
103-
ServerWorld serverWorld = ((IThreadedAnvilChunkStorage) context.tacs()).getWorld();
104-
ChunkState state = context.holder().getItem().get();
105-
Chunk chunk = state.chunk();
106-
for (BlockPos blockPos : blocksToRemove) {
107-
serverWorld.setBlockState(blockPos, Blocks.AIR.getDefaultState(), Block.NO_REDRAW | Block.FORCE_STATE);
108-
}
109-
for (BlockPos blockPos2 : ImmutableList.copyOf(chunk.getBlockEntityPositions())) {
110-
chunk.getBlockEntity(blockPos2);
103+
return Completable
104+
.fromRunnable(() -> {
105+
Assertions.assertTrue(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor().isOnThread());
106+
107+
try (var ignored = ThreadInstrumentation.getCurrent().begin(new ChunkTaskWork(context, (ServerAccessibleChunkSending) (Object) this, true))) {
108+
if (Config.suppressGhostMushrooms) {
109+
ServerWorld serverWorld = ((IThreadedAnvilChunkStorage) context.tacs()).getWorld();
110+
ChunkState state = context.holder().getItem().get();
111+
Chunk chunk = state.chunk();
112+
for (BlockPos blockPos : blocksToRemove) {
113+
serverWorld.setBlockState(blockPos, Blocks.AIR.getDefaultState(), Block.NO_REDRAW | Block.FORCE_STATE);
114+
}
115+
for (BlockPos blockPos2 : ImmutableList.copyOf(chunk.getBlockEntityPositions())) {
116+
chunk.getBlockEntity(blockPos2);
117+
}
118+
}
119+
sendChunkToPlayer(context.tacs(), context.holder());
111120
}
112-
}
113-
sendChunkToPlayer(context.tacs(), context.holder());
114-
}
115-
}, ((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor());
121+
})
122+
.subscribeOn(Schedulers.from(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor()));
116123
}
117124

118125
@Unique

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/NewChunkStatus.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
import com.ishland.flowsched.scheduler.ItemHolder;
1313
import com.ishland.flowsched.scheduler.ItemStatus;
1414
import com.ishland.flowsched.scheduler.KeyStatusPair;
15+
import io.reactivex.rxjava3.core.Completable;
1516
import net.minecraft.server.world.ChunkLevelType;
1617
import net.minecraft.server.world.ChunkLevels;
1718
import net.minecraft.util.math.ChunkPos;
1819
import net.minecraft.util.math.MathHelper;
1920
import net.minecraft.world.chunk.ChunkStatus;
2021

2122
import java.util.ArrayList;
22-
import java.util.concurrent.CompletionStage;
2323
import java.util.stream.IntStream;
2424

2525
/**
@@ -45,12 +45,12 @@ public abstract class NewChunkStatus implements ItemStatus<ChunkPos, ChunkState,
4545
ArrayList<NewChunkStatus> statuses = new ArrayList<>();
4646
NEW = new NewChunkStatus(statuses.size(), ChunkStatus.EMPTY) {
4747
@Override
48-
public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
48+
public Completable upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
4949
throw new UnsupportedOperationException();
5050
}
5151

5252
@Override
53-
public CompletionStage<Void> downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
53+
public Completable downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
5454
throw new UnsupportedOperationException();
5555
}
5656

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/statuses/Deferred.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import com.ishland.c2me.rewrites.chunksystem.common.ChunkLoadingContext;
44
import com.ishland.c2me.rewrites.chunksystem.common.NewChunkStatus;
55
import com.ishland.flowsched.scheduler.Cancellable;
6+
import io.reactivex.rxjava3.core.Completable;
67
import net.minecraft.world.chunk.ChunkStatus;
78

89
import java.util.concurrent.CompletableFuture;
9-
import java.util.concurrent.CompletionStage;
1010

1111
public class Deferred extends NewChunkStatus {
1212

@@ -15,13 +15,13 @@ public Deferred(int ordinal) {
1515
}
1616

1717
@Override
18-
public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
19-
return CompletableFuture.completedFuture(null);
18+
public Completable upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
19+
return Completable.complete();
2020
}
2121

2222
@Override
23-
public CompletionStage<Void> downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
24-
return CompletableFuture.completedFuture(null);
23+
public Completable downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
24+
return Completable.complete();
2525
}
2626

2727
@Override

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/statuses/ReadFromDisk.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ishland.c2me.rewrites.chunksystem.common.statuses;
22

3+
import com.ibm.asyncutil.util.Either;
34
import com.ishland.c2me.base.common.GlobalExecutors;
45
import com.ishland.c2me.base.common.config.ModStatuses;
56
import com.ishland.c2me.base.common.registry.SerializerAccess;
@@ -22,12 +23,12 @@
2223
import com.ishland.flowsched.scheduler.Cancellable;
2324
import com.ishland.flowsched.scheduler.ItemHolder;
2425
import com.ishland.flowsched.scheduler.KeyStatusPair;
26+
import com.ishland.flowsched.util.Assertions;
2527
import io.reactivex.rxjava3.annotations.NonNull;
2628
import io.reactivex.rxjava3.core.Completable;
2729
import io.reactivex.rxjava3.core.Single;
2830
import io.reactivex.rxjava3.schedulers.Schedulers;
2931
import net.minecraft.nbt.NbtCompound;
30-
import net.minecraft.registry.RegistryKeys;
3132
import net.minecraft.server.MinecraftServer;
3233
import net.minecraft.server.world.ServerWorld;
3334
import net.minecraft.util.math.ChunkPos;
@@ -59,13 +60,13 @@ public ReadFromDisk(int ordinal) {
5960
}
6061

6162
@Override
62-
public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
63+
public Completable upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
6364
final Single<ProtoChunk> single = invokeVanillaLoad(context)
6465
.retryWhen(RxJavaUtils.retryWithExponentialBackoff(5, 100));
6566
return finalizeLoading(context, single);
6667
}
6768

68-
protected @NotNull CompletionStage<Void> finalizeLoading(ChunkLoadingContext context, Single<ProtoChunk> single) {
69+
protected @NotNull Completable finalizeLoading(ChunkLoadingContext context, Single<ProtoChunk> single) {
6970
return single
7071
.doOnError(throwable -> {
7172
MinecraftServer server = ((IThreadedAnvilChunkStorage) context.tacs()).getWorld().getServer();
@@ -85,14 +86,15 @@ public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancella
8586
}
8687
})
8788
.ignoreElement()
88-
.cache()
89-
.toCompletionStage(null);
89+
.cache();
9090
}
9191

9292
protected @NonNull Single<ProtoChunk> invokeVanillaLoad(ChunkLoadingContext context) {
9393
return invokeInitialChunkRead(context)
9494
.observeOn(Schedulers.from(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor()))
9595
.map(chunkSerializer -> {
96+
Assertions.assertTrue(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor().isOnThread());
97+
9698
try (var ignored = ThreadInstrumentation.getCurrent().begin(new ChunkTaskWork(context, this, true))) {
9799
if (chunkSerializer.isPresent()) {
98100
return chunkSerializer.get().convert(
@@ -153,14 +155,17 @@ public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancella
153155
}
154156

155157
@Override
156-
public CompletionStage<Void> downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
158+
public Completable downgradeFromThis(ChunkLoadingContext context, Cancellable cancellable) {
157159
final AtomicBoolean loadedToWorld = new AtomicBoolean(false);
158-
return syncWithLightEngine(context).thenApplyAsync(unused -> {
160+
return Completable.defer(() -> Completable.fromCompletionStage(syncWithLightEngine(context)))
161+
.observeOn(Schedulers.from(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor()))
162+
.andThen(Completable.defer(() -> {
163+
Assertions.assertTrue(((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor().isOnThread());
164+
159165
try (var ignored = ThreadInstrumentation.getCurrent().begin(new ChunkTaskWork(context, this, false))) {
160166
if (context.holder().getTargetStatus().ordinal() > this.ordinal()) { // saving cancelled
161-
// LOGGER.info("Cancelling unload of {}", context.holder().getKey());
162167
cancellable.cancel();
163-
return CompletableFuture.<Void>failedFuture(new CancellationException());
168+
return Completable.error(new CancellationException());
164169
}
165170
final ChunkState chunkState = context.holder().getItem().get();
166171
Chunk chunk = chunkState.chunk();
@@ -175,14 +180,14 @@ public CompletionStage<Void> downgradeFromThis(ChunkLoadingContext context, Canc
175180
LifecycleEventInvoker.invokeChunkUnload(((IThreadedAnvilChunkStorage) context.tacs()).getWorld(), worldChunk);
176181
}
177182

178-
CompletionStage<Void> asyncSaveFuture;
183+
Completable asyncSaveFuture;
179184
if ((context.holder().getFlags() & ItemHolder.FLAG_BROKEN) != 0 && chunk instanceof ProtoChunk) { // do not save broken ProtoChunks
180185
LOGGER.warn("Not saving partially generated broken chunk {}", context.holder().getKey());
181-
asyncSaveFuture = CompletableFuture.completedStage((Void) null);
186+
asyncSaveFuture = Completable.complete();
182187
} else if (chunk instanceof WorldChunk && !chunkState.reachedStatus().isAtLeast(ChunkStatus.FULL)) {
183188
// do not save WorldChunks that doesn't reach full status: Vanilla behavior
184189
// If saved, block entities will be lost
185-
asyncSaveFuture = CompletableFuture.completedStage((Void) null);
190+
asyncSaveFuture = Completable.complete();
186191
} else {
187192
asyncSaveFuture = asyncSave(context, chunk);
188193
}
@@ -201,29 +206,30 @@ public CompletionStage<Void> downgradeFromThis(ChunkLoadingContext context, Canc
201206

202207
return asyncSaveFuture;
203208
}
204-
}, ((IThreadedAnvilChunkStorage) context.tacs()).getMainThreadExecutor())
205-
.thenCompose(Function.identity());
209+
}));
206210
}
207211

208-
private CompletionStage<Void> asyncSave(ChunkLoadingContext context, Chunk chunk) {
212+
private Completable asyncSave(ChunkLoadingContext context, Chunk chunk) {
209213
((IThreadedAnvilChunkStorage) context.tacs()).getPointOfInterestStorage().saveChunk(chunk.getPos());
210214
if (!chunk.tryMarkSaved()) {
211-
return CompletableFuture.completedStage(null);
215+
return Completable.complete();
212216
} else {
213217
ChunkPos chunkPos = chunk.getPos();
214218

215219
SerializedChunk serializer = SerializedChunk.fromChunk(((IThreadedAnvilChunkStorage) context.tacs()).getWorld(), chunk);
216-
return CompletableFuture.supplyAsync(() -> {
220+
return Single
221+
.<Either<NbtCompound, byte[]>>fromCallable(() -> {
217222
try (var ignored = ThreadInstrumentation.getCurrent().begin(new ChunkTaskWork(context, this, false))) {
218223
return SerializerAccess.getSerializer().serialize(serializer);
219224
}
220-
}, GlobalExecutors.prioritizedScheduler.executor(16) /* boost priority as we are serializing an unloaded chunk */)
221-
.thenCompose((either) -> {
225+
})
226+
.subscribeOn(Schedulers.from(GlobalExecutors.prioritizedScheduler.executor(16) /* boost priority as we are serializing an unloaded chunk */))
227+
.flatMapCompletable(either -> {
222228
if (either.left().isPresent()) {
223229
NbtCompound nbtCompound = either.left().get();
224-
return context.tacs().setNbt(chunkPos, () -> nbtCompound);
230+
return Completable.fromCompletionStage(context.tacs().setNbt(chunkPos, () -> nbtCompound));
225231
} else {
226-
return ((IDirectStorage) ((IVersionedChunkStorage) context.tacs()).getWorker()).setRawChunkData(chunkPos, either.right().get());
232+
return Completable.fromCompletionStage(((IDirectStorage) ((IVersionedChunkStorage) context.tacs()).getWorker()).setRawChunkData(chunkPos, either.right().get()));
227233
}
228234
});
229235
}

c2me-rewrites-chunk-system/src/main/java/com/ishland/c2me/rewrites/chunksystem/common/statuses/ReadFromDiskAsync.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.ishland.flowsched.scheduler.ItemHolder;
1313
import com.ishland.flowsched.scheduler.KeyStatusPair;
1414
import io.reactivex.rxjava3.annotations.NonNull;
15+
import io.reactivex.rxjava3.core.Completable;
1516
import io.reactivex.rxjava3.core.Single;
1617
import io.reactivex.rxjava3.schedulers.Schedulers;
1718
import it.unimi.dsi.fastutil.Pair;
@@ -21,8 +22,6 @@
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

24-
import java.util.concurrent.CompletionStage;
25-
2625
public class ReadFromDiskAsync extends ReadFromDisk {
2726

2827
private static final Logger LOGGER = LoggerFactory.getLogger("ReadFromDiskAsync");
@@ -32,7 +31,7 @@ public ReadFromDiskAsync(int ordinal) {
3231
}
3332

3433
@Override
35-
public CompletionStage<Void> upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
34+
public Completable upgradeToThis(ChunkLoadingContext context, Cancellable cancellable) {
3635
final Single<ProtoChunk> single = invokeAsyncLoad(context)
3736
.retryWhen(RxJavaUtils.retryWithExponentialBackoff(3, 200))
3837
.onErrorResumeNext(throwable -> {

0 commit comments

Comments
 (0)