Skip to content

Commit ca7a787

Browse files
SophieGuo410Sophie Guo
andauthored
Fix the dead lock issue by not calling resolveAllRemainingChunks in write (#3162)
* Fix the dead lock issue by not calling resolveAllRemainingChunks in write * refactor --------- Co-authored-by: Sophie Guo <[email protected]>
1 parent b4d01bd commit ca7a787

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

ambry-commons/src/main/java/com/github/ambry/commons/ByteBufferAsyncWritableChannel.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.ByteBuffer;
2121
import java.nio.channels.ClosedChannelException;
2222
import java.util.Queue;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.Future;
2425
import java.util.concurrent.LinkedBlockingQueue;
2526
import java.util.concurrent.TimeUnit;
@@ -100,14 +101,20 @@ public Future<Long> write(ByteBuf src, Callback<Long> callback) {
100101
if (src == null) {
101102
throw new IllegalArgumentException("Source buffer cannot be null");
102103
}
104+
if (!isOpen()) {
105+
src.release();
106+
CompletableFuture<Long> failedFuture = new CompletableFuture<>();
107+
failedFuture.completeExceptionally(new ClosedChannelException());
108+
if (callback != null) {
109+
callback.onCompletion(0L, new ClosedChannelException());
110+
}
111+
return failedFuture;
112+
}
103113
ChunkData chunkData = new ChunkData(src, callback);
104114
chunks.add(chunkData);
105115
if (channelEventListener != null) {
106116
channelEventListener.onEvent(EventType.Write);
107117
}
108-
if (!isOpen()) {
109-
resolveAllRemainingChunks(new ClosedChannelException());
110-
}
111118
return chunkData.future;
112119
}
113120

ambry-commons/src/test/java/com/github/ambry/commons/ByteBufferAsyncWritableChannelTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,40 @@ public void commonCaseTest() throws Exception {
8181
assertNull("There should have been no chunk returned", channel.getNextChunk(0));
8282
}
8383

84+
@Test
85+
public void writeAfterCloseShouldFailWithClosedChannelException() throws Exception {
86+
ByteBufferAsyncWritableChannel channel = new ByteBufferAsyncWritableChannel();
87+
88+
// Close the channel first
89+
channel.close();
90+
assertFalse("Channel should be closed", channel.isOpen());
91+
92+
// Prepare a dummy ByteBuf
93+
ByteBuf src = ByteBufAllocator.DEFAULT.heapBuffer(50);
94+
src.writeBytes(new byte[]{1, 2, 3});
95+
96+
WriteCallback callback = new WriteCallback(0);
97+
98+
// Perform write
99+
Future<Long> future = channel.write(src, callback);
100+
101+
// The future should fail with ClosedChannelException
102+
try {
103+
future.get();
104+
fail("Expected write to fail due to closed channel");
105+
} catch (ExecutionException e) {
106+
Throwable root = Utils.getRootCause(e);
107+
assertTrue("Expected ClosedChannelException but got " + root, root instanceof ClosedChannelException);
108+
}
109+
110+
// The callback should also have received a ClosedChannelException
111+
assertTrue("Expected callback exception to be ClosedChannelException but got " + callback.exception,
112+
callback.exception instanceof ClosedChannelException);
113+
114+
// The ByteBuf should have been released
115+
assertEquals("Expected src ByteBuf to be released", 0, src.refCnt());
116+
}
117+
84118
@Test
85119
public void commonCaseTestForNettyByteBuf() throws Exception {
86120
for (boolean useCompositeByteBuf : Arrays.asList(false, true)) {

0 commit comments

Comments
 (0)