Skip to content
32 changes: 31 additions & 1 deletion packages/sds/src/message_channel/mem_local_history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,37 @@
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export class MemLocalHistory {
export interface ILocalHistory {
length: number;
push(...items: ContentMessage[]): number;
some(
predicate: (
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any

Check warning on line 29 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 29 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
): boolean;
slice(start?: number, end?: number): ContentMessage[];
find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any

Check warning on line 38 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 38 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
): ContentMessage | undefined;
findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any

Check warning on line 46 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 46 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
): number;
}

export class MemLocalHistory implements ILocalHistory {
private items: ContentMessage[] = [];

/**
Expand Down Expand Up @@ -61,7 +91,7 @@
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any

Check warning on line 94 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 94 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
): boolean {
return this.items.some(predicate, thisArg);
}
Expand All @@ -76,7 +106,7 @@
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any

Check warning on line 109 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 109 in packages/sds/src/message_channel/mem_local_history.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
): ContentMessage | undefined {
return this.items.find(predicate, thisArg);
}
Expand Down
7 changes: 4 additions & 3 deletions packages/sds/src/message_channel/message_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";

import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
import { MemLocalHistory } from "./mem_local_history.js";
import {
ChannelId,
ContentMessage,
Expand All @@ -21,6 +20,7 @@ import {
ParticipantId,
SyncMessage
} from "./message.js";
import { PersistentHistory } from "./persistent_history.js";
import { RepairConfig, RepairManager } from "./repair/repair.js";

export const DEFAULT_BLOOM_FILTER_OPTIONS = {
Expand Down Expand Up @@ -106,7 +106,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
channelId: ChannelId,
senderId: ParticipantId,
options: MessageChannelOptions = {},
localHistory: ILocalHistory = new MemLocalHistory()
localHistory?: ILocalHistory
) {
super();
this.channelId = channelId;
Expand All @@ -117,7 +117,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer = [];
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory = localHistory;
this.localHistory =
localHistory ?? new PersistentHistory({ channelId: this.channelId });
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
// TODO: this should be determined based on the bloom filter parameters and number of hashes
Expand Down
62 changes: 62 additions & 0 deletions packages/sds/src/message_channel/persistent_history.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { expect } from "chai";

import { ContentMessage } from "./message.js";
import { HistoryStorage, PersistentHistory } from "./persistent_history.js";

class MemoryStorage implements HistoryStorage {
private readonly store = new Map<string, string>();

public getItem(key: string): string | null {
return this.store.get(key) ?? null;
}

public setItem(key: string, value: string): void {
this.store.set(key, value);
}

public removeItem(key: string): void {
this.store.delete(key);
}
}

const channelId = "channel-1";

const createMessage = (id: string, timestamp: number): ContentMessage => {
return new ContentMessage(
id,
channelId,
"sender",
[],
BigInt(timestamp),
undefined,
new Uint8Array([timestamp]),
undefined
);
};

describe("PersistentHistory", () => {
it("persists and restores messages", () => {
const storage = new MemoryStorage();
const history = new PersistentHistory({ channelId, storage });

history.push(createMessage("msg-1", 1));
history.push(createMessage("msg-2", 2));

const restored = new PersistentHistory({ channelId, storage });

expect(restored.length).to.equal(2);
expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([
"msg-1",
"msg-2"
]);
});

it("behaves like memory history when storage is unavailable", () => {
const history = new PersistentHistory({ channelId, storage: undefined });

history.push(createMessage("msg-3", 3));

expect(history.length).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3");
});
});
196 changes: 196 additions & 0 deletions packages/sds/src/message_channel/persistent_history.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";

import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js";
import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";

export interface HistoryStorage {
getItem(key: string): string | null;
setItem(key: string, value: string): void;
removeItem(key: string): void;
}

export interface PersistentHistoryOptions {
channelId: ChannelId;
storage?: HistoryStorage;
}

type StoredHistoryEntry = {
messageId: string;
retrievalHint?: string;
};

type StoredContentMessage = {
messageId: string;
channelId: string;
senderId: string;
lamportTimestamp: string;
causalHistory: StoredHistoryEntry[];
bloomFilter?: string;
content: string;
retrievalHint?: string;
};

const HISTORY_STORAGE_PREFIX = "waku:sds:history:";

/**
* Persists the SDS local history in a browser/localStorage compatible backend.
*
* If no storage backend is available, this behaves like {@link MemLocalHistory}.
*/
export class PersistentHistory implements ILocalHistory {
private readonly storage?: HistoryStorage;
private readonly storageKey: string;
private readonly memory: MemLocalHistory;

public constructor(options: PersistentHistoryOptions) {
this.memory = new MemLocalHistory();
this.storage = options.storage || localStorage;
this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`;
this.load();
}

public get length(): number {
return this.memory.length;
}

public push(...items: ContentMessage[]): number {
const length = this.memory.push(...items);
this.save();
return length;
}

public some(
predicate: (
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean {
return this.memory.some(predicate, thisArg);
}

public slice(start?: number, end?: number): ContentMessage[] {
return this.memory.slice(start, end);
}

public find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined {
return this.memory.find(predicate, thisArg);
}

public findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number {
return this.memory.findIndex(predicate, thisArg);
}

private save(): void {
if (!this.storage) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, does it make sense for it to be constructed without storage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want the class to behave like MemLocalHistory if no storage is provided, right?

return;
}

const payload = JSON.stringify(
this.memory.slice(0).map(serializeContentMessage)
);
this.storage.setItem(this.storageKey, payload);
}

private load(): void {
if (!this.storage) {
return;
}

try {
const raw = this.storage.getItem(this.storageKey);
if (!raw) {
return;
}

const stored = JSON.parse(raw) as StoredContentMessage[];
const messages = stored
.map(deserializeContentMessage)
.filter((message): message is ContentMessage => Boolean(message));
if (messages.length) {
this.memory.push(...messages);
}
} catch {
this.storage.removeItem(this.storageKey);
}
}
}

const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({
messageId: entry.messageId,
retrievalHint: entry.retrievalHint
? bytesToHex(entry.retrievalHint)
: undefined
});

const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({
messageId: entry.messageId,
retrievalHint: entry.retrievalHint
? hexToBytes(entry.retrievalHint)
: undefined
});

const serializeContentMessage = (
message: ContentMessage
): StoredContentMessage => ({
messageId: message.messageId,
channelId: message.channelId,
senderId: message.senderId,
lamportTimestamp: message.lamportTimestamp.toString(),
causalHistory: message.causalHistory.map(serializeHistoryEntry),
bloomFilter: toHex(message.bloomFilter),
content: bytesToHex(new Uint8Array(message.content)),
retrievalHint: toHex(message.retrievalHint)
});

const deserializeContentMessage = (
record: StoredContentMessage
): ContentMessage | undefined => {
try {
const content = hexToBytes(record.content);
return new ContentMessage(
record.messageId,
record.channelId,
record.senderId,
record.causalHistory.map(deserializeHistoryEntry),
BigInt(record.lamportTimestamp),
fromHex(record.bloomFilter),
content,
[],
fromHex(record.retrievalHint)
);
} catch {
return undefined;
}
};

const toHex = (
data?: Uint8Array | Uint8Array<ArrayBufferLike>
): string | undefined => {
if (!data || data.length === 0) {
return undefined;
}
return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data));
};

const fromHex = (value?: string): Uint8Array | undefined => {
if (!value) {
return undefined;
}
return hexToBytes(value);
};
Loading