-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker #24739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker #24739
Conversation
…rategy Refactor lock mechanism from StampedLock to ReentrantReadWriteLock for thread safety. Add async bucket snapshot creation and improve concurrent message handling.
…yncMergeBucketSnapshot
@codelipenghui @lhotari @coderzc @Apurva007 |
Without benchmarking, it will be hard to validate assumptions. I'd suggest adding JMH benchmarks so that performance could be compared. JMH benchmarks can be added to microbench module: https://github.com/apache/pulsar/tree/master/microbench |
OK. I will add JMH benchmarks first. |
…cketDelayedDeliveryTracker
microbench/src/main/java/org/apache/pulsar/broker/MockPersistentDispatcher.java
Outdated
Show resolved
Hide resolved
bump, this PR can be reviewed now. |
Fixes #24603
Main Issue: #24600
PIP: #xyz
Motivation
The current concurrent control model of
BucketDelayedDeliveryTracker
has serious performance bottlenecks and thread safety issues. It mainly relies on coarse-grained synchronized keywords to protect its internal state, which leads to several key problems:`All operations, whether simple read-only checks (like containsMessage) or complex I/O-intensive operations (like creating a Bucket snapshot), contend for the same object lock. This leads to operations being unnecessarily serialized, limiting overall throughput.
addMessage
orgetScheduledMessages
will block allcontainsMessage
andnextDeliveryTime
and other read-only checks.getScheduledMessages
method, which appears to be reading messages but actually includes multiple write operations such as modifying the queue, updating the bitmap, and removing mappings. When executed within a synchronized block, it holds the lock for a long time, blocking the addition of all new messages.Within the synchronized block of
addMessage
, the creation and persistence of Bucket snapshots are executed synchronously. This is a time-consuming I/O operation that blocks all subsequent message addition and read requests until the snapshot is completed, significantly increasing the message publishing delay.This PR is the first step in optimizing and enhancing the
BucketDelayedDeliveryTracker
, aiming to address the above performance bottlenecks by introducing more refined concurrency control mechanisms and asynchronizing I/O operations.Modifications
This change is mainly focused on the
addMessage
method, because it is one of the most critical bottlenecks under high throughput scenarios. Subsequent PRs will continue to optimize other methods based on this foundation, such asgetScheduledMessages
.ReentrantReadWriteLock
):Removed
StampedLock
and thesynchronized
keyword on multiple methods, unifying the use ofReentrantReadWriteLock
as the core concurrency control mechanism.This implements read-write separation:
Read operations (
containsMessage
,nextDeliveryTime
) now use a read lock, allowing multiple threads to execute concurrently without blocking each other.Write operations (
addMessage
,asyncMergeBucketSnapshot
) use a write lock, ensuring atomicity and consistency of data modifications.addMessage
internally adopts an optimized pattern of "read lock check -> write lock modification" to reduce the holding time of the write lock.Introduced a dedicated single-threaded
ExecutorService
(bucketSnapshotExecutor
) to handle time-consuming Bucket snapshot persistence tasks.Refactored the snapshot creation logic in
addMessage
:When a snapshot needs to be created, no longer wait synchronously. Instead, mark the current
MutableBucket
asbucketBeingSealed
.Immediately create a new
lastMutableBucket
to handle subsequent incoming messages, ensuring thataddMessage
calls can return quickly.Submit the actual snapshot creation and persistence task to the background
bucketSnapshotExecutor
for asynchronous execution.This change removes I/O operations from the critical path of message publishing, greatly reducing publish latency and increasing throughput.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: Denovo1998#10