-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29645 Reduce synchronization in AsyncBufferedMutatorImpl #7363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the intention here is to move table.batch call outside the synchronization block. As you said, table.batch should already been asynchronous, so it is a bit suprise that move it outside the synchronization can greatly increase the performance.
In general, all rpc request like locating a region should be an asynchronous call, do you have more details on what makes the table.batch call blocking for a long time?
The implementation looks good, but we still need to consideration more about the correctness. IIRC when running ITBLL against branch-3, we found a data loss issue in AsyncBufferedMuratorImpl, even though the synchronization is very simple...
Thanks.
if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { | ||
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { | ||
boolean shouldFlush = false; | ||
synchronized (AsyncBufferedMutatorImpl.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be changed to lock.lock()?
}, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); | ||
} | ||
// Preallocate to avoid potentially multiple resizes during addAll if we can. | ||
if (this.mutations instanceof ArrayList && this.futures instanceof ArrayList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two fields are private fields, so just declare them as ArrayList?
One pattern we see is the caller threads to mutator.mutate() are blocked by long call chains through netty doing IO for region location lookup. The theory as to why is CompletableFuture has two modes of operation... there is synchronous chaining (thenApply, thenAccept, whenComplete) where the code executes in the same thread that completes the future, while only asynchronous chaining (thenApplyAsync, thenAcceptAsync, whenCompleteAsync) guarantees execution of the lambdas in a separate thread context. HBase async client code may chain by thenX() not thenXAsync(). If some upstream code like region (re)location completes a future synchronously the downstream pipeline executes immediately in the current thread context and blocks the caller. The change to ABM guarantees it won't impact callers to mutate() if this happens, although the fact this happens suggests elsewhere in the async client changes from thenX() to thenXAsync() are warranted, as a follow up issue. |
f4d8007 addresses spotbugs findings and implements review feedback. |
@Apache9 What would you suggest here? I can add more test coverage. Something like this seems straightforward to implement and will not increase the running time of the test suite too much: Step 1: Generate a test data set of configurable size, default 100K rows. This will provide test coverage we don't have now, but it does not validate correctness beyond the happy path. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, all rpc request like locating a region should be an asynchronous call, do you have more details on what makes the table.batch call blocking for a long time?
I assume we're not waiting for any RPC responses on the internalFlush
thread, but with large enough buffers and large enough numbers of mutations and high enough concurrency on incoming mutations, it seems even AsyncBatchRpcRetryingCaller#groupAndSend
can take long enough (milliseconds?) to decrease overall throughput.
If the region location is in the cache, then the future completes synchronously in AsyncNonMetaRegionLocator#getRegionLocationsInternal
, which allows more work under the synchronized
block, which prevents further mutations from being accepted.
hbase/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
Lines 504 to 513 in d0b9478
private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, | |
byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { | |
// AFTER should be convert to CURRENT before calling this method | |
assert !locateType.equals(RegionLocateType.AFTER); | |
TableCache tableCache = getTableCache(tableName); | |
if (!reload) { | |
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); | |
if (isGood(locs, replicaId)) { | |
return CompletableFuture.completedFuture(locs); | |
} |
When the number of mutations could be >100,000 and number of region locations could be >10,000, and most of those locations are in the cache, groupAndSend
of those Multi RPCs yields a non-trivial amount of work.
// Preallocate to avoid potentially multiple resizes during addAll | ||
this.mutations.ensureCapacity(this.mutations.size() + mutations.size()); | ||
this.futures.ensureCapacity(this.futures.size() + futures.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they are ArrayList
, do we need to call ensureCapacity
or do the internals of addAll
in the jdk already do that? https://github.com/openjdk/jdk8u/blob/5cffbcb0344f2cf16682a09519894ba705182241/jdk/src/share/classes/java/util/ArrayList.java#L582-L585
So maybe we should check depth of the stack trace? In netty there are some tricks around this area, if the future is complete synchronously all the time and makes a very deep call stack trace, it will force schedule an asynchronous task to prevent stack overflow and also reduce the blocking execution time. |
We need to manually consider the implementation carefully, and after merging this PR, we should run ITBLL several rounds before cutting the next release. |
|
||
private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class); | ||
|
||
private final int INITIAL_CAPACITY = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static
internalFlush(FlushType.MANUAL); | ||
} | ||
|
||
protected void internalFlush(FlushType trigger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better choice is to inline this method to the caller, and make the lock protect the check and replace logic, i.e, if we think we should flush, then swap the mutations and futures to local variables and recreate them, and set a local bool variable may be called shouldSend
to true, and check this bool outside the lock protection and send the mutate request out.
In this way we do not need the double check and flush type check too, and the logic will be more easy to understand. The only problem is the test which overrides internalFlush method. We could try to find other ways to implement it.
This patch modifies
AsyncBufferedMutatorImpl
class to improve its performance under concurrent usage.While
AsyncTable#batch()
is largely asynchronous in nature, it can exhibit blocking behavior during its preparation phase, for instance, while looking up region locations. In the original implementation ofAsyncBufferedMutatorImpl
, calls toAsyncTable#batch()
occur within a synchronized block, potentially causing severe contention and stalling other threads trying to buffer their mutations. The original implementation relied on coarse grained synchronized blocks for multi-threading safety, so when one thread triggered a buffer flush (either because the buffer was full or a periodic timer fired), all other threads attempting to add mutations via the mutate method would be blocked until the table.batch() call completed, which could take a surprisingly long time.The new implementation replaces the broad synchronized blocks with a
ReentrantLock
. This lock is acquired only for the brief period needed to safely copy the current batch of mutations and futures into local variables and swap in a new internal buffer. Immediately after this quick operation, the lock is released. Thebatch()
call is then executed outside of the locked section. This allows other threads to continue adding new mutations concurrently while the flushing of the previous batch proceeds independently. The client has already opted in to asynchronous and potentially interleaved commit of the mutations submitted toAsyncBufferedMutator
, by definition. The minimization of critical section scope minimizes thread contention and significantly boosts throughput under load. Other related profiler driven efficiency changes are also included, such as elimination of stream api and array resizing hotspots identified by the profiler.To validate the performance improvement of these changes, a JMH benchmark,
AsyncBufferedMutatorBenchmark
was created to measure the performance of the mutate method under various conditions. It focuses specifically on the overhead and concurrency management ofAsyncBufferedMutatorImpl
itself, not the underlying network communication. To achieve this, it uses theMockito
framework to create a mockAsyncTable
that instantly returns completed futures, isolating the mutator's buffering logic for measurement. It runs tests with 1, 10, and 100 threads to simulate no, medium, and high levels of concurrency. It uses a low value (100) formaxMutations
to force frequent flushes based on the number of mutations, and a very high value (100,000) to ensure flushes are rare in that measurement case. The benchmark measures the average time per operation in microseconds, where a lower score indicates better performance and higher throughput.With a single thread and no contention the performance of both implementations is nearly identical. The minor variations are negligible and show that the new locking mechanism does not introduce any performance regression in the non-concurrent case. For example, with a 10MB buffer and high maxMutations, the NEW implementation scored 0.167 us/op while the OLD scored 0.169 us/op, a statistically insignificant difference. When the test is run with 10 threads, a noticeable gap appears. In the scenario designed to cause frequent flushes (maxMutations = 100), the NEW implementation is approximately 12 times faster than the OLD one (14.250 us/op for NEW vs. 172.463 us/op for OLD). This is because the OLD implementation forces threads to wait while flushes occur, and flushes incur a synthetic thread sleep of 1ms to simulate occasional unexpected blocking behavior in
AsyncTable#batch()
, whereas the NEW implementation allows them to proceed without contention. The most significant results come from the 100-thread tests, which simulate high contention. In the frequent flush scenario (maxMutations = 100) the NEW implementation is 114 times faster in the synthetic benchmark scenario (16.123 us/op for NEW vs. 1847.567 us/op for OLD). Note that blocking IO observed in a real client for e.g. region location lookups can produce a much more significant impact. With the OLD code, 100 threads are constantly competing for a lock that is held for a long duration, leading to a contention storm. The NEW code's reduced locking scope almost entirely eliminates this bottleneck.OS: Apple Silicon (aarch64) M1 Max / 64 GB
JVM: openjdk version "17.0.11" 2024-04-16 LTS / OpenJDK 64-Bit Server VM Zulu17.50+19-CA (build 17.0.11+9-LTS, mixed mode, sharing)