diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index fa62bfb9ae..67e0b73ae3 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -17,7 +17,37 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ -export class MemLocalHistory { +export interface ILocalHistory { + length: number; + push(...items: ContentMessage[]): number; + some( + predicate: ( + value: ContentMessage, + index: number, + array: ContentMessage[] + ) => unknown, + thisArg?: any + ): boolean; + slice(start?: number, end?: number): ContentMessage[]; + find( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): ContentMessage | undefined; + findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number; +} + +export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; /** diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 36919bbce2..db7b966330 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,7 +7,6 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; -import { MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, @@ -21,6 +20,7 @@ import { ParticipantId, SyncMessage } from "./message.js"; +import { PersistentHistory } from "./persistent_history.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -106,7 +106,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory: ILocalHistory = new MemLocalHistory() + localHistory?: ILocalHistory ) { super(); this.channelId = channelId; @@ -117,7 +117,8 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - this.localHistory = localHistory; + this.localHistory = + localHistory ?? new PersistentHistory({ channelId: this.channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_history.spec.ts b/packages/sds/src/message_channel/persistent_history.spec.ts new file mode 100644 index 0000000000..941473dd94 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_history.spec.ts @@ -0,0 +1,62 @@ +import { expect } from "chai"; + +import { ContentMessage } from "./message.js"; +import { HistoryStorage, PersistentHistory } from "./persistent_history.js"; + +class MemoryStorage implements HistoryStorage { + private readonly store = new Map(); + + public getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + + public setItem(key: string, value: string): void { + this.store.set(key, value); + } + + public removeItem(key: string): void { + this.store.delete(key); + } +} + +const channelId = "channel-1"; + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; + +describe("PersistentHistory", () => { + it("persists and restores messages", () => { + const storage = new MemoryStorage(); + const history = new PersistentHistory({ channelId, storage }); + + history.push(createMessage("msg-1", 1)); + history.push(createMessage("msg-2", 2)); + + const restored = new PersistentHistory({ channelId, storage }); + + expect(restored.length).to.equal(2); + expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + }); + + it("behaves like memory history when storage is unavailable", () => { + const history = new PersistentHistory({ channelId, storage: undefined }); + + history.push(createMessage("msg-3", 3)); + + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); + }); +}); diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts new file mode 100644 index 0000000000..d68cae183e --- /dev/null +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -0,0 +1,196 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; + +import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; +import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; + +export interface HistoryStorage { + getItem(key: string): string | null; + setItem(key: string, value: string): void; + removeItem(key: string): void; +} + +export interface PersistentHistoryOptions { + channelId: ChannelId; + storage?: HistoryStorage; +} + +type StoredHistoryEntry = { + messageId: string; + retrievalHint?: string; +}; + +type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredHistoryEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; + +/** + * Persists the SDS local history in a browser/localStorage compatible backend. + * + * If no storage backend is available, this behaves like {@link MemLocalHistory}. + */ +export class PersistentHistory implements ILocalHistory { + private readonly storage?: HistoryStorage; + private readonly storageKey: string; + private readonly memory: MemLocalHistory; + + public constructor(options: PersistentHistoryOptions) { + this.memory = new MemLocalHistory(); + this.storage = options.storage || localStorage; + this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; + this.load(); + } + + public get length(): number { + return this.memory.length; + } + + public push(...items: ContentMessage[]): number { + const length = this.memory.push(...items); + this.save(); + return length; + } + + public some( + predicate: ( + value: ContentMessage, + index: number, + array: ContentMessage[] + ) => unknown, + thisArg?: any + ): boolean { + return this.memory.some(predicate, thisArg); + } + + public slice(start?: number, end?: number): ContentMessage[] { + return this.memory.slice(start, end); + } + + public find( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): ContentMessage | undefined { + return this.memory.find(predicate, thisArg); + } + + public findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number { + return this.memory.findIndex(predicate, thisArg); + } + + private save(): void { + if (!this.storage) { + return; + } + + const payload = JSON.stringify( + this.memory.slice(0).map(serializeContentMessage) + ); + this.storage.setItem(this.storageKey, payload); + } + + private load(): void { + if (!this.storage) { + return; + } + + try { + const raw = this.storage.getItem(this.storageKey); + if (!raw) { + return; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + const messages = stored + .map(deserializeContentMessage) + .filter((message): message is ContentMessage => Boolean(message)); + if (messages.length) { + this.memory.push(...messages); + } + } catch { + this.storage.removeItem(this.storageKey); + } + } +} + +const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({ + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined +}); + +const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({ + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined +}); + +const serializeContentMessage = ( + message: ContentMessage +): StoredContentMessage => ({ + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map(serializeHistoryEntry), + bloomFilter: toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: toHex(message.retrievalHint) +}); + +const deserializeContentMessage = ( + record: StoredContentMessage +): ContentMessage | undefined => { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map(deserializeHistoryEntry), + BigInt(record.lamportTimestamp), + fromHex(record.bloomFilter), + content, + [], + fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } +}; + +const toHex = ( + data?: Uint8Array | Uint8Array +): string | undefined => { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); +}; + +const fromHex = (value?: string): Uint8Array | undefined => { + if (!value) { + return undefined; + } + return hexToBytes(value); +};