-
Notifications
You must be signed in to change notification settings - Fork 283
Fix ByteBuf memory leak in PutOperation when operations are aborted #3176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix ByteBuf memory leak in PutOperation when operations are aborted #3176
Conversation
When a PutOperation is aborted or fails before all data is processed by the ChunkFiller thread, channelReadBuf may still hold a reference to a ByteBuf that was read from the channel but not yet consumed. This causes a memory leak as the buffer is never released. Changes: - Add channelReadBuf.release() in cleanupChunks() to ensure the buffer is properly released when the operation completes or fails. - Remove synchronized modifier from PutChunk.fillFrom() as it's not needed with the ChunkFiller single threaded model - Add test case testPutOperationByteBufLeakOnAbort() to verify ByteBuf resources are properly released when operations are aborted mid-flight The fix ensures that even if the ChunkFiller thread hasn't processed all data from the channel when an operation completes/fails, the ByteBuf holding unprocessed data is properly released, preventing memory leaks.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3176 +/- ##
============================================
+ Coverage 64.24% 69.92% +5.68%
- Complexity 10398 12814 +2416
============================================
Files 840 930 +90
Lines 71755 79006 +7251
Branches 8611 9434 +823
============================================
+ Hits 46099 55247 +9148
+ Misses 23004 20845 -2159
- Partials 2652 2914 +262 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // At this point, if the channelReadBuf is not null it means it did not get fully read | ||
| // by the ChunkFiller in fillChunks and needs to be released. | ||
| if (channelReadBuf != null) { | ||
| channelReadBuf.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is a better clean fix with unit test than adding synchronized. thank you
After even more extensive review, we identified that not only is there a case where the read buf is leaked, it's also possible for the read buf to be used after free in rare cases where the memory is released by the network and GCed before it is retained by the ChunkFiller thread. Some of the memory leak in the previous commit is masking some of the use-after-free issues fixed in this commit. Given how intertwined they are, it wouldn't be safe to merge these changes separately.
|
After even more extensive review, we identified that not only is there a case where the read buf Some of the memory leak in the previous commit is masking some of the use-after-free issues. Getting the tests to segfault is obviously not possible, but I believe the testing approach here does |
| * @return the number of bytes transferred in this operation. | ||
| */ | ||
| synchronized int fillFrom(ByteBuf channelReadBuf) { | ||
| int fillFrom(ByteBuf channelReadBuf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need this function to be synchronized here, it's here to protect race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets walk through to see if that's the case.
fillFrom is called in only one place, from fillChunks which itself is only called from ChunkFiller in PutManager. ChunkFiller is a runnable run by a single thread:
chunkFillerThread = Utils.newThread("ChunkFillerThread-" + suffix, new ChunkFiller(), true);
chunkFillerThread.start();
So fillChunks since it's only accessed by a single thread would only needs to be synchronized from concurrent access from error / cleanup threads. What is needed is that any objects used within the fillChunks routine which may also be concurrently accessed by those threads to be either behind a more narrowly scoped lock or declared as volatile. So lets look at that.
In PutManager.poll we have:
for (PutOperation op : putOperations) {
try {
op.poll(requestRegistrationCallback);
} catch (Exception e) {
op.setOperationExceptionAndComplete(
new RouterException("Put poll encountered unexpected error", e, RouterErrorCode.UnexpectedInternalError));
}
if (op.isOperationComplete() && putOperations.remove(op)) {
// In order to ensure that an operation is completed only once, call onComplete() only at the place where the
// operation actually gets removed from the set of operations. See comment within closePendingOperations().
onComplete(op);
}
}
So
setOperationExceptionAndCompletemay be set with a RouterException.- If
isOperationCompleteis true, thenonCompletemay be called which callscleanupChunks
Therefore we need to
a) make sure anything that happens within setOperationExceptionAndComplete is not concurrent with anything that happens in fillChunks.
b) make sure that either i) nothing concurrent happens in fillChunks after isOperationComplete is true or ii) make sure what does happen is behind a lock.
In PutManager.completePendingOperations we have:
for (PutOperation op : putOperations) {
// There is a rare scenario where the operation gets removed from this set and gets completed concurrently by
// the RequestResponseHandler thread when it is in poll() or handleResponse(). In order to avoid the completion
// from happening twice, complete it here only if the remove was successful.
if (putOperations.remove(op)) {
op.cleanupChunks();
Exception e = new RouterException("Aborted operation because Router is closed.", RouterErrorCode.RouterClosed);
routerMetrics.operationDequeuingRate.mark();
routerMetrics.operationAbortCount.inc();
routerMetrics.onPutBlobError(e, op.isEncryptionEnabled(), op.isStitchOperation());
nonBlockingRouter.completeOperation(op.getFuture(), op.getCallback(), null, e);
}
}
and completePendingOperations only runs as cleanup within the Chunkfiller thread, proving that it cannot be concurrent with fillChunks
So lets look at condition a:
void setOperationExceptionAndComplete(Exception exception) {
if (exception instanceof RouterException) {
RouterUtils.replaceOperationException(operationException, (RouterException) exception, this::getPrecedenceLevel);
} else if (exception instanceof ClosedChannelException) {
operationException.compareAndSet(null, exception);
} else {
operationException.set(exception);
}
setOperationCompleted();
}
...
void setOperationCompleted() {
operationCompleted = true;
clearReadyChunks();
}
...
private synchronized void clearReadyChunks() {
for (PutChunk chunk : putChunks) {
logger.debug("{}: Chunk {} state: {}", loggingContext, chunk.getChunkIndex(), chunk.getState());
// Only release the chunk in ready or complete mode. Filler thread will release the chunk in building mode
// and the encryption thread will release the chunk in encrypting mode.
if (chunk.isReady() || chunk.isComplete()) {
chunk.clear();
}
}
}
So for condition A we set the exception, set operation completed, and clear chunks which are provably finished. None of this will involve concurrent modification with fillChunks.
Lets looks at condition b:
fillChunks() {
// a lot of channelReadBuf and chunk modification!
}
For condition b we can either add synchronized to fillChunks (instead of fillFrom) or we can add a lock around updating the operationCompleted value (and when we need to avoid TOCTOU). The synchronized on fillChunks should cause the least amount of complexity without too large an of an overhead as most of the work in fillChunks happens within an internal loop depending on the operationCompleted value.
Summary
When a PutOperation is aborted or fails before all data is processed by the ChunkFiller thread, channelReadBuf may still hold a reference to a ByteBuf that was read from the channel but not yet consumed. This causes a memory leak as the buffer is never released.
Additionally, because the channelReadBuf is not retained after being read, there's a narrow window on failure where that memory could be released and GCed before the ChunkFiller thread creates a chunk and processes that memory. We protect against this with an explicit retain/release pattern in the fillChunks and cleanupChunks methods.
Changes:
The fix ensures that even if the ChunkFiller thread hasn't processed all data from the channel when an operation completes/fails, the ByteBuf holding unprocessed data is properly released, preventing memory leaks or use-after-free crashes.
Testing Done
Added test case testPutOperationByteBufLeakOnAbort() to verify ByteBuf resources are properly released when operations are aborted mid-flight
Added test cases in PutOperation.java to show the use-after-free condition and prove the fix resolves that gap.