Skip to content

Conversation

BewareMyPower
Copy link
Contributor

Main Issue: #24697

Motivation

The current asynchronous read APIs of managed ledger are callback-based that a pair of callback and context are needed. For example,

ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());

The callback is the dispatcher itself, the context is a customized recyclable object. The direct cause of #24697 is that an incorrect context was passed to the callback. When I'm investigating the issue, I first suspect that the context has been recycled twice unexpectedly. It's a bit impossible because recycle() is only called in callback's readEntriesComplete or readEntriesFailed methods whose implementations are both synchronized and will set havePendingRead to false, while havePendingRead was true.

The other possible reason is that an incorrect context was passed to the callback. When I'm revisiting the code, I found it very hard to read because the lifetimes of callback and context are very ambiguous.

Let's start from ManagedLedgerImpl#internalReadFromLedger, which must be reached
for any read operation.

asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);

Before the line above, the ownership of read context (ctx) was passed to OpReadEntry. However, since ctx is not private, after the line, ctx is owned by both OpReadEntry and ctx.

Then they're passed to EntryCache#asyncReadEntry, where OpReadEntry is treated as a callback, and eventually doAsyncReadEntriesWithAcquiredPermits.

ReadEntriesCallback wrappedCallback = new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx2) {
if (!entries.isEmpty()) {
// release permits only when entries have been handled
AtomicInteger remainingCount = new AtomicInteger(entries.size());
for (Entry entry : entries) {
((EntryImpl) entry).onDeallocate(() -> {
if (remainingCount.decrementAndGet() <= 0) {
pendingReadsLimiter.release(handle);
}
});
}
} else {
pendingReadsLimiter.release(handle);
}
originalCallback.readEntriesComplete(entries, ctx2);

Now, the OpReadEntry was wrapped into another callback. On one hand, another extra heap memory allocation on the OpReadEntry makes the complicated recycler design ridiculous. On the other hand, once ctx2 is not the same with ctx, the downstream caller will fail unexpectedly, just like #24697.

readMissingEntriesAsync(lh, firstEntryInRange, firstPosition.getEntryId() + i - 1,
expectedReadCount, ctx));

Let's go deeper, ctx is even passed to readMissingEntriesAsync and then pendingReadsManager.readEntries, which means now ctx is shared by 3 owners. The code of PendingReadsManager is much more complicated, there is even a cache that caches ctx indirectly in PendingRead#listeners.

I'm not looking further into the code. But from the code reading above, the root cause of the complex code is that the ctx is shared everywhere and hard to track.

To solve this issue, CompletableFuture is a best choice:

  • Multiple complete or completeExceptionally calls can succeed only once
  • For the same reason, any callback on the future, like whenComplete can only be called once

That means, even though the API is callback-based, if the underlying implementations all call future-based methods, the lifetimes of read callback and context can be clearly the same:

    protected void asyncReadEntry(/* ... */, OpReadEntry opReadEntry) {
        IntSupplier expectedReadCount = opReadEntry.cursor::getNumberOfCursorsAtSamePositionOrBefore;
        final var future = entryCache.asyncReadEntry(/* ... */);
        future.whenComplete((entries, throwable) -> {
            if (throwable == null) {
                opReadEntry.readEntriesComplete(entries);
            } else {
                opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable));
            }
        });

In the code above, callback and ctx are never exposed (i.e. they are uniquely owned by OpReadEntry) and can only be called by readEntriesComplete or readEntriesFailed. Therefore, it guarantees callback and ctx always match.

Modifications

  • Refactor the EntryCache layer to make all APIs future-based
  • Make ctx and callback private in OpReadEntry and only allow them to be called by readEntriesComplete or readEntriesFailed

Additionally, it also changes the behavior when an OpReadEntry is cancelled and make recycle() private. #16399 fixes the case that the OpReadEntry is not recycled when being cancelled. However, if the context is also a recyclable object and needs to be recycled by triggering the callback with it, the ctx could not be recycled as well. Cancelling the read is a commonly seen case that happens when a consumer is removed, the recycler's thread local queue could easily be full.

The complicated ReadEntryCallbackWrapper design is also replaced by a CompletableFuture since we no longer need to use a unique id (readOpCount) to check if the wrapper has been recycled due to timeout. Because once a future is completed exceptionally due to timeout, completing the future again will not take effect.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#53

@BewareMyPower BewareMyPower self-assigned this Sep 16, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 16, 2025
@BewareMyPower BewareMyPower added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/ML release/4.0.7 release/4.1.1 and removed doc-not-needed Your PR changes do not impact docs labels Sep 16, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ML doc-not-needed Your PR changes do not impact docs release/4.1.2 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants