Skip to content

Commit

Permalink
Lock-free BlockingIdentifierGenerator LoHi
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Apr 24, 2023
1 parent 27c87c7 commit 9e838df
Showing 1 changed file with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;

Expand Down Expand Up @@ -56,22 +57,55 @@ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierG
//to reason about what the current state is and what the CombinerExecutor is
//supposed to work on.
private static class GeneratorState {
private int loValue;
private long hiValue;

private static final class LoHi {

private static final AtomicIntegerFieldUpdater<LoHi> LO_UPDATER = AtomicIntegerFieldUpdater.newUpdater(LoHi.class, "lo");
private final long hi;
private volatile long lo;

LoHi(long hi) {
this.hi = hi;
this.lo = 1;
}

public long next(int blockSize) {
if (lo >= blockSize) {
return -1;
}
final long nextLo = LO_UPDATER.getAndIncrement(this);
if (nextLo < blockSize) {
return hi + nextLo;
}
return -1;
}
}

private volatile LoHi loHi;

public long hi(long hi) {
loHi = new LoHi(hi);
return hi;
}

public long next(int blockSize) {
final LoHi loHi = this.loHi;
if (loHi == null) {
return -1;
}
return loHi.next(blockSize);
}
}

//Critical section: needs to be accessed exclusively via the CombinerExecutor
//when there's contention; direct invocation is allowed in the fast path.
private synchronized long next() {
return state.loValue > 0 && state.loValue < getBlockSize()
? state.hiValue + state.loValue++
: -1; //flag value indicating that we need to hit db
private long next() {
return state.next(getBlockSize());
}

//Critical section: needs to be accessed exclusively via the CombinerExecutor
private synchronized long next(long hi) {
state.hiValue = hi;
state.loValue = 1;
private long next(long hi) {
state.hi(hi);
return hi;
}

Expand All @@ -90,8 +124,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
//if it were to happen we should be better off with direct execution rather than using
//the co-operative executor:
if ( getBlockSize() <= 1 ) {
return nextHiValue( connectionSupplier )
.thenApply( i -> next( i ) );
return nextHiValue( connectionSupplier ).thenApply( i -> next( i ) );
}

final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
Expand All @@ -108,8 +141,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
} else {
context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) );
}
}
else {
} else {
if ( t != null ) {
resultForThisEventLoop.completeExceptionally( t );
} else {
Expand Down Expand Up @@ -137,32 +169,29 @@ public Task execute(GeneratorState state) {
// We don't need to update or initialize the hi
// value in the table, so just increment the lo
// value and return the next id in the block
completedFuture( local )
.whenComplete( this::acceptAsReturnValue );
completedFuture( local ).whenComplete( this::acceptAsReturnValue );
return null;
} else {
nextHiValue( connectionSupplier )
.whenComplete( (newlyGeneratedHi, throwable) -> {
if ( throwable != null ) {
result.completeExceptionally( throwable );
} else {
//We ignore the state argument as we actually use the field directly
//for convenience, but they are the same object.
executor.submit( stateIgnored -> {
result.complete( next( newlyGeneratedHi ) );
return null;
});
}
} );
nextHiValue( connectionSupplier ).whenComplete( (newlyGeneratedHi, throwable) -> {
if ( throwable != null ) {
result.completeExceptionally( throwable );
} else {
//We ignore the state argument as we actually use the field directly
//for convenience, but they are the same object.
executor.submit( stateIgnored -> {
result.complete( next( newlyGeneratedHi ) );
return null;
});
}
} );
return null;
}
}

private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
if ( throwable != null ) {
result.completeExceptionally( throwable );
}
else {
} else {
result.complete( aLong );
}
}
Expand Down

0 comments on commit 9e838df

Please sign in to comment.