Skip to content

Commit 9e3828d

Browse files
authored
fix(wal): filter inner trim record for range get (#3000)
Signed-off-by: Robin Han <[email protected]>
1 parent fdb40ea commit 9e3828d

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static com.automq.stream.s3.wal.impl.object.ObjectUtils.floorAlignOffset;
4848
import static com.automq.stream.s3.wal.impl.object.ObjectUtils.genObjectPathV1;
4949

50+
@SuppressWarnings("checkstyle:cyclomaticComplexity")
5051
@EventLoopSafe
5152
public class DefaultReader {
5253
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReader.class);
@@ -190,7 +191,11 @@ private void doRunBatchGet0(BatchReadTask readTask) {
190191
long nextRecordOffset = finalNextGetOffset;
191192
int lastReadableBytes = buf.readableBytes();
192193
while (buf.readableBytes() > 0 && nextRecordOffset < readTask.endOffset.offset()) {
193-
batches.add(ObjectUtils.decodeRecordBuf(buf));
194+
StreamRecordBatch batch = ObjectUtils.decodeRecordBuf(buf);
195+
boolean isTriggerTrimRecord = batch.getCount() == 0 && batch.getStreamId() == -1L && batch.getEpoch() == -1L;
196+
if (!isTriggerTrimRecord) {
197+
batches.add(batch);
198+
}
194199
nextRecordOffset += lastReadableBytes - buf.readableBytes();
195200
lastReadableBytes = buf.readableBytes();
196201
}

s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,44 @@ public void testGet_batch() throws Exception {
119119
}
120120
}
121121

122+
@Test
123+
public void testGet_batchSkipTrim() throws Exception {
124+
ObjectWALConfig config = ObjectWALConfig.builder()
125+
.withEpoch(1L)
126+
.withMaxBytesInBatch(1024)
127+
.withBatchInterval(1000)
128+
.build();
129+
ObjectWALService wal = new ObjectWALService(time, objectStorage, config);
130+
acquire(config);
131+
wal.start();
132+
133+
List<CompletableFuture<AppendResult>> appendCfList = new ArrayList<>();
134+
for (int i = 0; i < 8; i++) {
135+
appendCfList.add(wal.append(TraceContext.DEFAULT,
136+
new StreamRecordBatch(233L, 10, 100L + i, 1, generateByteBuf(256))));
137+
// ensure objects are flushed/uploaded
138+
((DefaultWriter) (wal.writer)).flush().join();
139+
if (i == 4) {
140+
// write a trim marker by trimming to the 4th record's offset (this will produce a trim record object)
141+
wal.trim(appendCfList.get(3).get().recordOffset()).get();
142+
}
143+
}
144+
145+
146+
// query across a range that spans objects including the trim marker
147+
List<StreamRecordBatch> records = wal.get(
148+
DefaultRecordOffset.of(appendCfList.get(4).get().recordOffset()),
149+
DefaultRecordOffset.of(wal.confirmOffset())
150+
).get();
151+
152+
assertEquals(4, records.size());
153+
for (int i = 0; i < records.size(); i++) {
154+
assertEquals(104L + i, records.get(i).getBaseOffset());
155+
}
156+
wal.shutdownGracefully();
157+
}
158+
159+
122160
@Test
123161
public void testTrim() throws Exception {
124162
ObjectWALConfig config = ObjectWALConfig.builder().withEpoch(1L).withMaxBytesInBatch(1024).withBatchInterval(1000).build();

0 commit comments

Comments
 (0)