Skip to content

Commit 6eab590

Browse files
j-tylerJustin Marsh
andauthored
Fix potential memory leak in GetBlobOperation (#3163)
Since chunkAsyncWriteCallback is passed to asyncWritableChannel we don't have guarantees on if it is run across a single or multiple threads. In the concurrent case, it had a get-then-increment pattern which can cause chunkIndexToBufWaitingForRelease.remove(currentNumChunk) to skip a a chunk and cause a memory leak. The added test will fail if the change to chunkAsyncWriteCallback is removed. Co-authored-by: Justin Marsh <[email protected]>
1 parent ca7a787 commit 6eab590

File tree

2 files changed

+141
-2
lines changed

2 files changed

+141
-2
lines changed

ambry-router/src/main/java/com/github/ambry/router/GetBlobOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,13 +520,12 @@ public void onCompletion(Long result, Exception exception) {
520520
if (exception != null) {
521521
setOperationException(exception);
522522
}
523-
int currentNumChunk = numChunksWrittenOut.get();
523+
int currentNumChunk = numChunksWrittenOut.getAndIncrement();
524524
ByteBuf byteBuf = chunkIndexToBufWaitingForRelease.remove(currentNumChunk);
525525
if (byteBuf != null) {
526526
ReferenceCountUtil.safeRelease(byteBuf);
527527
}
528528
lastChunkWrittenDoneTime.set(SystemTime.getInstance().milliseconds());
529-
numChunksWrittenOut.incrementAndGet();
530529
routerCallback.onPollReady();
531530
}
532531
};

ambry-router/src/test/java/com/github/ambry/router/GetBlobOperationTest.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.io.DataInputStream;
7878
import java.io.IOException;
7979
import java.nio.ByteBuffer;
80+
import java.nio.channels.ClosedChannelException;
8081
import java.nio.charset.StandardCharsets;
8182
import java.util.ArrayList;
8283
import java.util.Arrays;
@@ -2155,6 +2156,145 @@ public void run() {
21552156
((RouterException) operationException).getErrorCode());
21562157
}
21572158

2159+
@Test
2160+
public void testChunkAsyncWriteCallbackMemoryLeak() throws Exception {
2161+
// Three chunks ensure multiple callbacks are queued so we can force the concurrent completion ordering below.
2162+
blobSize = maxChunkSize * 3;
2163+
doPut();
2164+
2165+
final AtomicReference<Exception> callbackException = new AtomicReference<>();
2166+
final CountDownLatch readCompleteLatch = new CountDownLatch(1);
2167+
// Custom channel records callbacks so we can trigger two of them in parallel and mimic the original race.
2168+
final ConcurrentCallbackChannel channel = new ConcurrentCallbackChannel();
2169+
2170+
Callback<GetBlobResult> callback = new Callback<GetBlobResult>() {
2171+
@Override
2172+
public void onCompletion(GetBlobResult result, Exception exception) {
2173+
if (exception != null) {
2174+
callbackException.set(exception);
2175+
readCompleteLatch.countDown();
2176+
} else {
2177+
try {
2178+
// Capture the data stream on our test channel; the inner callback fires once the operation thinks the
2179+
// write is complete. The test channel will delay/coordinate the callbacks to exercise the leak scenario.
2180+
result.getBlobDataChannel().readInto(channel, new Callback<Long>() {
2181+
@Override
2182+
public void onCompletion(Long bytesRead, Exception exception) {
2183+
if (exception != null) {
2184+
callbackException.set(exception);
2185+
}
2186+
readCompleteLatch.countDown();
2187+
}
2188+
});
2189+
} catch (Exception e) {
2190+
callbackException.set(e);
2191+
readCompleteLatch.countDown();
2192+
}
2193+
}
2194+
}
2195+
};
2196+
2197+
createOperationAndComplete(callback);
2198+
Assert.assertTrue("Timeout waiting for read to complete", readCompleteLatch.await(5, TimeUnit.SECONDS));
2199+
2200+
if (callbackException.get() != null) {
2201+
throw callbackException.get();
2202+
}
2203+
2204+
//Thread.sleep(500);
2205+
channel.close();
2206+
}
2207+
2208+
private static class ConcurrentCallbackChannel implements AsyncWritableChannel {
2209+
// Track callbacks by chunk index so that we can defer invoking them until both the first and second chunk arrive.
2210+
private final Map<Integer, Callback<Long>> callbacks = new HashMap<>();
2211+
private int chunkIndex = 0;
2212+
private volatile boolean open = true;
2213+
2214+
@Override
2215+
public Future<Long> write(ByteBuffer src, Callback<Long> callback) {
2216+
return write(Unpooled.wrappedBuffer(src), callback);
2217+
}
2218+
2219+
@Override
2220+
public Future<Long> write(ByteBuf src, Callback<Long> callback) {
2221+
FutureResult<Long> future = new FutureResult<>();
2222+
if (!open) {
2223+
Exception e = new ClosedChannelException();
2224+
future.done(0L, e);
2225+
if (callback != null) {
2226+
callback.onCompletion(0L, e);
2227+
}
2228+
return future;
2229+
}
2230+
2231+
int index = chunkIndex++;
2232+
long size = src.readableBytes();
2233+
callbacks.put(index, callback);
2234+
2235+
if (index == 1) {
2236+
// When the second chunk appears we concurrently fire the first two callbacks to recreate the race on
2237+
// numChunksWrittenOut incrementing inside the operation.
2238+
invokeConcurrently(callbacks.get(0), callbacks.get(1), size);
2239+
} else if (index == 2) {
2240+
// The final chunk completes immediately so the test focuses on the first two callbacks racing.
2241+
callback.onCompletion(size, null);
2242+
}
2243+
2244+
future.done(size, null);
2245+
return future;
2246+
}
2247+
2248+
private void invokeConcurrently(Callback<Long> cb1, Callback<Long> cb2, long size) {
2249+
CountDownLatch start = new CountDownLatch(1);
2250+
CountDownLatch done = new CountDownLatch(2);
2251+
2252+
Runnable task1 = () -> {
2253+
try {
2254+
start.await();
2255+
if (cb1 != null) cb1.onCompletion(size, null);
2256+
} catch (InterruptedException e) {
2257+
Thread.currentThread().interrupt();
2258+
} finally {
2259+
done.countDown();
2260+
}
2261+
};
2262+
2263+
Runnable task2 = () -> {
2264+
try {
2265+
start.await();
2266+
if (cb2 != null) cb2.onCompletion(size, null);
2267+
} catch (InterruptedException e) {
2268+
Thread.currentThread().interrupt();
2269+
} finally {
2270+
done.countDown();
2271+
}
2272+
};
2273+
2274+
// Launch both callbacks and release them at once so they race with the shared state in the production code.
2275+
new Thread(task1).start();
2276+
new Thread(task2).start();
2277+
start.countDown();
2278+
2279+
try {
2280+
done.await(2, TimeUnit.SECONDS);
2281+
} catch (InterruptedException e) {
2282+
Thread.currentThread().interrupt();
2283+
}
2284+
}
2285+
2286+
@Override
2287+
public boolean isOpen() {
2288+
return open;
2289+
}
2290+
2291+
@Override
2292+
public void close() {
2293+
open = false;
2294+
callbacks.clear();
2295+
}
2296+
}
2297+
21582298
/**
21592299
* Test range requests on a single chunk blob.
21602300
* @param resolveRangeOnEmptyBlob the value of {@link GetBlobOptions#resolveRangeOnEmptyBlob()} to set.

0 commit comments

Comments
 (0)