-
Notifications
You must be signed in to change notification settings - Fork 497
Description
AutoMQ Version
1.6.0
Operating System
mac os
Installation Method
source
Hardware Configuration
No response
Other Relevant Software
No response
What Went Wrong?
Critical ByteBuf Memory Leak when an exception is thrown during bulk upload preparation.
Location:
s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java, lines 321-384
The Problem:
private void uploadBulk0(Bulk bulk) {
try {
long startTime = time.nanoseconds();
List records = bulk.records;
// Order by <streamId, offset>
records.sort(...); // CAN THROW EXCEPTION
long firstOffset = bulk.baseOffset;
long nextOffset = firstOffset;
long lastRecordOffset = nextOffset;
CompositeByteBuf dataBuffer = ByteBufAlloc.compositeByteBuffer(); // LINE 340: ALLOCATE
for (Record record : records) {
record.offset = nextOffset;
lastRecordOffset = record.offset;
ByteBuf data = record.streamRecordBatch.encoded();
ByteBuf header = BYTE_BUF_ALLOC.byteBuffer(RECORD_HEADER_SIZE); // LINE 345: ALLOCATE
header = WALUtil.generateHeader(data, header, 0, nextOffset); // CAN THROW EXCEPTION
nextOffset += record.size;
dataBuffer.addComponent(true, header);
dataBuffer.addComponent(true, data);
}
// Build object buffer.
long dataLength = dataBuffer.readableBytes();
nextOffset = ObjectUtils.ceilAlignOffset(nextOffset); // CAN THROW EXCEPTION
long endOffset = nextOffset;
CompositeByteBuf objectBuffer = ByteBufAlloc.compositeByteBuffer(); // LINE 357: ALLOCATE
WALObjectHeader header = new WALObjectHeader(...);
objectBuffer.addComponent(true, header.marshal()); // CAN THROW EXCEPTION
objectBuffer.addComponent(true, dataBuffer);
// ... more code ...
objectStorage.write(writeOptions, path, objectBuffer); // LINE 368: OWNERSHIP TRANSFERRED
} catch (Throwable ex) { // LINE 381
bulk.uploadCf.completeExceptionally(ex); // LINE 382: NO CLEANUP!
}
}
Scenarios of Memory Leak
If ANY exception occurs between lines 322-368 (before objectStorage.write() is called):
Lines 322-340: Exception during sort or before dataBuffer allocation
No leak (no buffers allocated yet)
Lines 340-357: Exception during data assembly (e.g., in WALUtil.generateHeader())
LEAK: dataBuffer and all header ByteBufs added to it
Lines 357-368: Exception during object buffer assembly (e.g., in header.marshal())
LEAK: Both dataBuffer AND objectBuffer plus all components
After Line 368: Once objectStorage.write() is called
No leak (write() method takes ownership and releases on failure)
What Should Have Happened Instead?
| Expected | Actual (Bug) |
| Exception → buffers released → no leak | Exception → buffers NOT released → LEAK | | Memory stable under failures | Memory grows with each failure | | Broker recovers from transient errors | Broker crashes with OOM after enough failures |
Steps to Reproduce
Scenario with Network Issues
Steps
1 :- Setup AutoMQ with S3/MinIO:
docker-compose up -d
2:- Create Topic and Start Producer:
./bin/kafka-topics.sh --create --topic leak-test
--bootstrap-server localhost:9092
--partitions 10 --replication-factor 1
3:- Inject Network Chaos:
Simulate intermittent S3 failures
docker exec minio killall -STOP minio # Pause MinIO briefly
sleep 2
docker exec minio killall -CONT minio # Resume
High-Throughput Producer during chaos:
./bin/kafka-producer-perf-test.sh
--topic leak-test
--num-records 1000000
--record-size 1024
--throughput 100000
--producer-props bootstrap.servers=localhost:9092
4:- Monitor Memory Leak:
Watch direct memory grow
watch -n 1 'jcmd $(pgrep -f kafka) VM.native_memory summary | grep -A1 "Direct"'
Expected Behavior:
When MinIO pauses, some uploadBulk0() calls fail during buffer assembly
Exceptions caught but ByteBufs not released
Direct memory increases steadily
Eventually: OutOfDirectMemoryError crashes the broker