Skip to content

Commit

Permalink
Fix iterator not loading all chunks (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k authored Mar 5, 2024
1 parent 1a2a35e commit f7c852b
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- jsonc -*-
{
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.formatOnSave": true,
Expand Down
28 changes: 13 additions & 15 deletions src/BaseIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { IBagReader } from "./IBagReader";
import { ChunkInfo, Connection, MessageData } from "./record";
import type {
ChunkReadResult,
Decompress,
IteratorConstructorArgs,
MessageIterator,
MessageEvent,
Decompress,
MessageIterator,
} from "./types";

type HeapItem = { time: Time; offset: number; chunkReadResult: ChunkReadResult };
Expand Down Expand Up @@ -55,25 +55,23 @@ export abstract class BaseIterator implements MessageIterator {
}
}

// Load the next set of messages into the heap
protected abstract loadNext(): Promise<void>;
/**
* Load the next set of messages into the heap
* @returns False if no more messages can be loaded, True otherwise.
*/
protected abstract loadNext(): Promise<boolean>;

/**
* @returns An AsyncIterator of MessageEvents
*/
async *[Symbol.asyncIterator](): AsyncIterator<MessageEvent> {
while (true) {
if (!this.heap.front()) {
await this.loadNext();
}

// The first load may place us in the middle of a chunk. The topic messages we care
// about may already be "behind" us.
//
// When that happens, we end up with an empty heap and need to try loading one more time.
// This next load will access the next chunks with messages for our topic (or EOF).
if (!this.heap.front()) {
await this.loadNext();
// Keep on reading chunks into the heap until no more chunk can be loaded (EOF)
while (!this.heap.front()) {
const chunkLoaded = await this.loadNext();
if (!chunkLoaded) {
return;
}
}

const item = this.heap.pop();
Expand Down
104 changes: 104 additions & 0 deletions src/ForwardIterator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,108 @@ describe("ForwardIterator", () => {
const actualMessages = await consumeMessages(iterator);
expect(actualMessages).toEqual(expectedMessages.filter((msg) => msg.timestamp.nsec >= 4));
});

it("should iterate over messages in overlapping and non-overlapping chunks", async () => {
/* Chunk ordering (shown numbers are connection number, X-axis is time):
*
* 0 --------- 1 --------------- 0
* 0 --------- 2 --------------- 0
* 0 --------- 1 --------------- 0
* 0 --- 1 --- 0
*/
const { connections, chunkInfos, reader, expectedMessages } = generateFixtures({
chunks: [
{
messages: [
{
connection: 0,
time: 0,
value: 1,
},
{
connection: 1,
time: 2,
value: 1,
},
{
connection: 0,
time: 5,
value: 1,
},
],
},
{
messages: [
{
connection: 0,
time: 1,
value: 3,
},
{
connection: 2,
time: 3,
value: 3,
},
{
connection: 0,
time: 6,
value: 3,
},
],
},
{
messages: [
{
connection: 0,
time: 2,
value: 5,
},
{
connection: 1,
time: 4,
value: 5,
},
{
connection: 0,
time: 7,
value: 5,
},
],
},
{
messages: [
{
connection: 0,
time: 8,
value: 5,
},
{
connection: 1,
time: 9,
value: 5,
},
{
connection: 0,
time: 10,
value: 5,
},
],
},
],
});

const iterator = new ForwardIterator({
connections,
chunkInfos,
decompress: {},
reader,
position: { sec: 0, nsec: 0 },
topics: ["/1", "/2"],
});

const actualMessages = await consumeMessages(iterator);
expect(actualMessages).toEqual(
expectedMessages.filter((msg) => ["/1", "/2"].includes(msg.topic)),
);
});
});
7 changes: 4 additions & 3 deletions src/ForwardIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ export class ForwardIterator extends BaseIterator {
}
}

protected override async loadNext(): Promise<void> {
protected override async loadNext(): Promise<boolean> {
const stamp = this.position;

const firstChunkInfo = this.remainingChunkInfos[0];
if (!firstChunkInfo) {
return;
return false;
}

this.remainingChunkInfos[0] = undefined;
Expand Down Expand Up @@ -76,7 +76,7 @@ export class ForwardIterator extends BaseIterator {

// End of file or no more candidates
if (chunksToLoad.length === 0) {
return;
return false;
}

// Add 1 nsec to make end 1 past the end for the next read
Expand Down Expand Up @@ -111,5 +111,6 @@ export class ForwardIterator extends BaseIterator {
}

this.cachedChunkReadResults = newCache;
return true;
}
}
9 changes: 5 additions & 4 deletions src/ReverseIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Heap from "heap";

import { BaseIterator } from "./BaseIterator";
import { ChunkInfo } from "./record";
import { IteratorConstructorArgs, ChunkReadResult } from "./types";
import { ChunkReadResult, IteratorConstructorArgs } from "./types";

export class ReverseIterator extends BaseIterator {
private remainingChunkInfos: (ChunkInfo | undefined)[];
Expand Down Expand Up @@ -36,12 +36,12 @@ export class ReverseIterator extends BaseIterator {
}
}

protected override async loadNext(): Promise<void> {
protected override async loadNext(): Promise<boolean> {
const stamp = this.position;

const firstChunkInfo = this.remainingChunkInfos[0];
if (!firstChunkInfo) {
return;
return false;
}

this.remainingChunkInfos[0] = undefined;
Expand Down Expand Up @@ -75,7 +75,7 @@ export class ReverseIterator extends BaseIterator {

// End of file or no more candidates
if (chunksToLoad.length === 0) {
return;
return false;
}

// Subtract 1 nsec to make the next position 1 before
Expand Down Expand Up @@ -110,5 +110,6 @@ export class ReverseIterator extends BaseIterator {
}

this.cachedChunkReadResults = newCache;
return true;
}
}

0 comments on commit f7c852b

Please sign in to comment.