Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.automq.stream.s3.wal.impl.object.ObjectUtils.floorAlignOffset;
import static com.automq.stream.s3.wal.impl.object.ObjectUtils.genObjectPathV1;

@SuppressWarnings("checkstyle:cyclomaticComplexity")
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cyclomatic complexity suppression is applied at the class level, which disables the check for all methods in the class. Consider applying this suppression more narrowly at the method level (e.g., on doRunBatchGet0 method) to maintain complexity checks for other methods.

Copilot uses AI. Check for mistakes.
@EventLoopSafe
public class DefaultReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReader.class);
Expand Down Expand Up @@ -190,7 +191,11 @@ private void doRunBatchGet0(BatchReadTask readTask) {
long nextRecordOffset = finalNextGetOffset;
int lastReadableBytes = buf.readableBytes();
while (buf.readableBytes() > 0 && nextRecordOffset < readTask.endOffset.offset()) {
batches.add(ObjectUtils.decodeRecordBuf(buf));
StreamRecordBatch batch = ObjectUtils.decodeRecordBuf(buf);
boolean isTriggerTrimRecord = batch.getCount() == 0 && batch.getStreamId() == -1L && batch.getEpoch() == -1L;
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trim record detection logic is duplicated across the codebase. In RecoverIterator.java line 154, a similar check exists but only checks streamId == -1L && epoch == -1L, omitting the count == 0 check. Consider extracting this logic into a utility method (e.g., ObjectUtils.isTrimRecord()) to ensure consistency and maintainability across all trim record checks.

Suggested change
boolean isTriggerTrimRecord = batch.getCount() == 0 && batch.getStreamId() == -1L && batch.getEpoch() == -1L;
boolean isTriggerTrimRecord = ObjectUtils.isTrimRecord(batch);

Copilot uses AI. Check for mistakes.
if (!isTriggerTrimRecord) {
batches.add(batch);
}
nextRecordOffset += lastReadableBytes - buf.readableBytes();
lastReadableBytes = buf.readableBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,44 @@ public void testGet_batch() throws Exception {
}
}

@Test
public void testGet_batchSkipTrim() throws Exception {
ObjectWALConfig config = ObjectWALConfig.builder()
.withEpoch(1L)
.withMaxBytesInBatch(1024)
.withBatchInterval(1000)
.build();
ObjectWALService wal = new ObjectWALService(time, objectStorage, config);
acquire(config);
wal.start();

List<CompletableFuture<AppendResult>> appendCfList = new ArrayList<>();
for (int i = 0; i < 8; i++) {
appendCfList.add(wal.append(TraceContext.DEFAULT,
new StreamRecordBatch(233L, 10, 100L + i, 1, generateByteBuf(256))));
// ensure objects are flushed/uploaded
((DefaultWriter) (wal.writer)).flush().join();
if (i == 4) {
// write a trim marker by trimming to the 4th record's offset (this will produce a trim record object)
wal.trim(appendCfList.get(3).get().recordOffset()).get();
}
}


// query across a range that spans objects including the trim marker
List<StreamRecordBatch> records = wal.get(
DefaultRecordOffset.of(appendCfList.get(4).get().recordOffset()),
DefaultRecordOffset.of(wal.confirmOffset())
).get();

assertEquals(4, records.size());
for (int i = 0; i < records.size(); i++) {
assertEquals(104L + i, records.get(i).getBaseOffset());
}
wal.shutdownGracefully();
}


@Test
public void testTrim() throws Exception {
ObjectWALConfig config = ObjectWALConfig.builder().withEpoch(1L).withMaxBytesInBatch(1024).withBatchInterval(1000).build();
Expand Down
Loading