From 9e838dfddc8f8be150de31d94b695010cd23ea62 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Mon, 24 Apr 2023 06:51:27 +0200 Subject: [PATCH] Lock-free BlockingIdentifierGenerator LoHi --- .../id/impl/BlockingIdentifierGenerator.java | 89 ++++++++++++------- 1 file changed, 59 insertions(+), 30 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java index d15de1779a..18109aa8e7 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java @@ -17,6 +17,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; @@ -56,22 +57,55 @@ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierG //to reason about what the current state is and what the CombinerExecutor is //supposed to work on. private static class GeneratorState { - private int loValue; - private long hiValue; + + private static final class LoHi { + + private static final AtomicIntegerFieldUpdater LO_UPDATER = AtomicIntegerFieldUpdater.newUpdater(LoHi.class, "lo"); + private final long hi; + private volatile long lo; + + LoHi(long hi) { + this.hi = hi; + this.lo = 1; + } + + public long next(int blockSize) { + if (lo >= blockSize) { + return -1; + } + final long nextLo = LO_UPDATER.getAndIncrement(this); + if (nextLo < blockSize) { + return hi + nextLo; + } + return -1; + } + } + + private volatile LoHi loHi; + + public long hi(long hi) { + loHi = new LoHi(hi); + return hi; + } + + public long next(int blockSize) { + final LoHi loHi = this.loHi; + if (loHi == null) { + return -1; + } + return loHi.next(blockSize); + } } //Critical section: needs to be accessed exclusively via the CombinerExecutor //when there's contention; direct invocation is allowed in the fast path. - private synchronized long next() { - return state.loValue > 0 && state.loValue < getBlockSize() - ? state.hiValue + state.loValue++ - : -1; //flag value indicating that we need to hit db + private long next() { + return state.next(getBlockSize()); } //Critical section: needs to be accessed exclusively via the CombinerExecutor - private synchronized long next(long hi) { - state.hiValue = hi; - state.loValue = 1; + private long next(long hi) { + state.hi(hi); return hi; } @@ -90,8 +124,7 @@ public CompletionStage generate(ReactiveConnectionSupplier connectionSuppl //if it were to happen we should be better off with direct execution rather than using //the co-operative executor: if ( getBlockSize() <= 1 ) { - return nextHiValue( connectionSupplier ) - .thenApply( i -> next( i ) ); + return nextHiValue( connectionSupplier ).thenApply( i -> next( i ) ); } final CompletableFuture resultForThisEventLoop = new CompletableFuture<>(); @@ -108,8 +141,7 @@ public CompletionStage generate(ReactiveConnectionSupplier connectionSuppl } else { context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) ); } - } - else { + } else { if ( t != null ) { resultForThisEventLoop.completeExceptionally( t ); } else { @@ -137,23 +169,21 @@ public Task execute(GeneratorState state) { // We don't need to update or initialize the hi // value in the table, so just increment the lo // value and return the next id in the block - completedFuture( local ) - .whenComplete( this::acceptAsReturnValue ); + completedFuture( local ).whenComplete( this::acceptAsReturnValue ); return null; } else { - nextHiValue( connectionSupplier ) - .whenComplete( (newlyGeneratedHi, throwable) -> { - if ( throwable != null ) { - result.completeExceptionally( throwable ); - } else { - //We ignore the state argument as we actually use the field directly - //for convenience, but they are the same object. - executor.submit( stateIgnored -> { - result.complete( next( newlyGeneratedHi ) ); - return null; - }); - } - } ); + nextHiValue( connectionSupplier ).whenComplete( (newlyGeneratedHi, throwable) -> { + if ( throwable != null ) { + result.completeExceptionally( throwable ); + } else { + //We ignore the state argument as we actually use the field directly + //for convenience, but they are the same object. + executor.submit( stateIgnored -> { + result.complete( next( newlyGeneratedHi ) ); + return null; + }); + } + } ); return null; } } @@ -161,8 +191,7 @@ public Task execute(GeneratorState state) { private void acceptAsReturnValue(final Long aLong, final Throwable throwable) { if ( throwable != null ) { result.completeExceptionally( throwable ); - } - else { + } else { result.complete( aLong ); } }