diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index eba12d4acd..86474c41a7 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -13,6 +13,7 @@ import type { Uint8ArrayList } from 'uint8arraylist' export interface HistoryEntry { messageId: string retrievalHint?: Uint8Array + senderId?: string } export namespace HistoryEntry { @@ -35,6 +36,11 @@ export namespace HistoryEntry { w.bytes(obj.retrievalHint) } + if (obj.senderId != null) { + w.uint32(26) + w.string(obj.senderId) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -57,6 +63,10 @@ export namespace HistoryEntry { obj.retrievalHint = reader.bytes() break } + case 3: { + obj.senderId = reader.string() + break + } default: { reader.skipType(tag & 7) break @@ -87,6 +97,7 @@ export interface SdsMessage { lamportTimestamp?: bigint causalHistory: HistoryEntry[] bloomFilter?: Uint8Array + repairRequest: HistoryEntry[] content?: Uint8Array } @@ -132,6 +143,13 @@ export namespace SdsMessage { w.bytes(obj.bloomFilter) } + if (obj.repairRequest != null) { + for (const value of obj.repairRequest) { + w.uint32(106) + HistoryEntry.codec().encode(value, w) + } + } + if (obj.content != null) { w.uint32(162) w.bytes(obj.content) @@ -145,7 +163,8 @@ export namespace SdsMessage { senderId: '', messageId: '', channelId: '', - causalHistory: [] + causalHistory: [], + repairRequest: [] } const end = length == null ? reader.len : reader.pos + length @@ -184,6 +203,16 @@ export namespace SdsMessage { obj.bloomFilter = reader.bytes() break } + case 13: { + if (opts.limits?.repairRequest != null && obj.repairRequest.length === opts.limits.repairRequest) { + throw new MaxLengthError('Decode error - map field "repairRequest" had too many elements') + } + + obj.repairRequest.push(HistoryEntry.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.repairRequest$ + })) + break + } case 20: { obj.content = reader.bytes() break diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index c38e99b084..50ca08d716 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -3,6 +3,8 @@ syntax = "proto3"; message HistoryEntry { string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash + + optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension } message SdsMessage { @@ -12,5 +14,8 @@ message SdsMessage { optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + + repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. + optional bytes content = 20; // Actual content of the message } diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index 3c1fb30cb1..14960d36b4 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -14,8 +14,14 @@ export { type HistoryEntry, type ChannelId, type MessageChannelEvents, - type SenderId, + type ParticipantId, type MessageId } from "./message_channel/index.js"; +/** + * @deprecated Use ParticipantId instead. SenderId has been renamed to ParticipantId + * to better reflect that it represents a channel participant, not just a message sender. + */ +export type { ParticipantId as SenderId } from "./message_channel/index.js"; + export { BloomFilter }; diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index 75318df210..ecc2a55edc 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -1,4 +1,4 @@ -import { HistoryEntry, Message, MessageId } from "./message.js"; +import { HistoryEntry, Message, MessageId, ParticipantId } from "./message.js"; export enum MessageChannelEvent { OutMessageSent = "sds:out:message-sent", @@ -10,7 +10,13 @@ export enum MessageChannelEvent { OutSyncSent = "sds:out:sync-sent", InSyncReceived = "sds:in:sync-received", InMessageLost = "sds:in:message-irretrievably-lost", - ErrorTask = "sds:error-task" + ErrorTask = "sds:error-task", + // SDS-R Repair Events + RepairRequestQueued = "sds:repair:request-queued", + RepairRequestSent = "sds:repair:request-sent", + RepairRequestReceived = "sds:repair:request-received", + RepairResponseQueued = "sds:repair:response-queued", + RepairResponseSent = "sds:repair:response-sent" } export type MessageChannelEvents = { @@ -26,5 +32,24 @@ export type MessageChannelEvents = { [MessageChannelEvent.InMessageLost]: CustomEvent; [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; - [MessageChannelEvent.ErrorTask]: CustomEvent; + [MessageChannelEvent.ErrorTask]: CustomEvent; + [MessageChannelEvent.RepairRequestQueued]: CustomEvent<{ + messageId: MessageId; + tReq: number; + }>; + [MessageChannelEvent.RepairRequestSent]: CustomEvent<{ + messageIds: MessageId[]; + carrierMessageId: MessageId; + }>; + [MessageChannelEvent.RepairRequestReceived]: CustomEvent<{ + messageIds: MessageId[]; + fromSenderId?: ParticipantId; + }>; + [MessageChannelEvent.RepairResponseQueued]: CustomEvent<{ + messageId: MessageId; + tResp: number; + }>; + [MessageChannelEvent.RepairResponseSent]: CustomEvent<{ + messageId: MessageId; + }>; }; diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 53c37ae388..7a8e279c2a 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -8,7 +8,7 @@ export { HistoryEntry, Message, MessageId, - SenderId, + ParticipantId, SyncMessage, isContentMessage, isEphemeralMessage, diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 680bf5cdb5..860529a9e5 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -44,6 +44,7 @@ describe("Message serialization", () => { [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], 0n, undefined, + undefined, undefined ); @@ -54,6 +55,39 @@ describe("Message serialization", () => { { messageId: depMessageId, retrievalHint: depRetrievalHint } ]); }); + + it("Repair Request", () => { + const repairMessageId = "missing-message"; + const repairRetrievalHint = utf8ToBytes("missing-retrieval"); + const repairSenderId = "original-sender"; + const message = new Message( + "123", + "my-channel", + "me", + [], + 0n, + undefined, + undefined, + [ + { + messageId: repairMessageId, + retrievalHint: repairRetrievalHint, + senderId: repairSenderId + } + ] + ); + + const bytes = message.encode(); + const decMessage = Message.decode(bytes); + + expect(decMessage!.repairRequest).to.deep.equal([ + { + messageId: repairMessageId, + retrievalHint: repairRetrievalHint, + senderId: repairSenderId + } + ]); + }); }); describe("ContentMessage comparison with < operator", () => { diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index 78b99f9006..6bea3e938e 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -4,19 +4,20 @@ import { Logger } from "@waku/utils"; export type MessageId = string; export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; -export type SenderId = string; +export type ParticipantId = string; const log = new Logger("sds:message"); export class Message implements proto_sds_message.SdsMessage { public constructor( - public messageId: string, + public messageId: MessageId, public channelId: string, - public senderId: string, + public senderId: ParticipantId, public causalHistory: proto_sds_message.HistoryEntry[], public lamportTimestamp?: bigint | undefined, public bloomFilter?: Uint8Array | undefined, public content?: Uint8Array | undefined, + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -38,7 +39,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp, bloomFilter, - content + content, + repairRequest } = proto_sds_message.SdsMessage.decode(data); if (testContentMessage({ lamportTimestamp, content })) { @@ -49,7 +51,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp!, bloomFilter, - content! + content!, + repairRequest ); } @@ -61,7 +64,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, undefined, bloomFilter, - content! + content!, + repairRequest ); } @@ -73,7 +77,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp!, bloomFilter, - undefined + undefined, + repairRequest ); } log.error( @@ -90,13 +95,14 @@ export class Message implements proto_sds_message.SdsMessage { export class SyncMessage extends Message { public constructor( - public messageId: string, + public messageId: MessageId, public channelId: string, - public senderId: string, + public senderId: ParticipantId, public causalHistory: proto_sds_message.HistoryEntry[], public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -110,6 +116,7 @@ export class SyncMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } @@ -134,13 +141,14 @@ export function isSyncMessage( export class EphemeralMessage extends Message { public constructor( - public messageId: string, + public messageId: MessageId, public channelId: string, - public senderId: string, + public senderId: ParticipantId, public causalHistory: proto_sds_message.HistoryEntry[], public lamportTimestamp: undefined, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -157,6 +165,7 @@ export class EphemeralMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } @@ -182,13 +191,14 @@ function testEphemeralMessage(message: { export class ContentMessage extends Message { public constructor( - public messageId: string, + public messageId: MessageId, public channelId: string, - public senderId: string, + public senderId: ParticipantId, public causalHistory: proto_sds_message.HistoryEntry[], public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -205,6 +215,7 @@ export class ContentMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 91184f04d8..ea1629250c 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -162,7 +162,8 @@ describe("MessageChannel", function () { .slice(-causalHistorySize - 1, -1) .map((message) => ({ messageId: MessageChannel.getMessageId(utf8ToBytes(message)), - retrievalHint: undefined + retrievalHint: undefined, + senderId: "alice" })); expect(causalHistory).to.deep.equal(expectedCausalHistory); }); @@ -298,6 +299,7 @@ describe("MessageChannel", function () { 1n, undefined, payload, + undefined, testRetrievalHint ), testRetrievalHint diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 3df21f160a..f72ba52579 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -18,15 +18,21 @@ import { isSyncMessage, Message, MessageId, - SenderId, + ParticipantId, SyncMessage } from "./message.js"; +import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, errorRate: 0.001 }; +/** + * Maximum number of repair requests to include in a single message + */ +const MAX_REPAIR_REQUESTS_PER_MESSAGE = 3; + const DEFAULT_CAUSAL_HISTORY_SIZE = 200; const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; @@ -46,6 +52,15 @@ export interface MessageChannelOptions { * How many possible acks does it take to consider it a definitive ack. */ possibleAcksThreshold?: number; + /** + * Whether to enable SDS-R repair protocol. + * @default true + */ + enableRepair?: boolean; + /** + * SDS-R repair configuration. Only used if enableRepair is true. + */ + repairConfig?: RepairConfig; } export type ILocalHistory = Pick< @@ -55,7 +70,7 @@ export type ILocalHistory = Pick< export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; - public readonly senderId: SenderId; + public readonly senderId: ParticipantId; private lamportTimestamp: bigint; private filter: DefaultBloomFilter; private outgoingBuffer: ContentMessage[]; @@ -66,6 +81,7 @@ export class MessageChannel extends TypedEventEmitter { private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; private readonly timeoutForLostMessagesMs?: number; + private readonly repairManager?: RepairManager; private tasks: Task[] = []; private handlers: Handlers = { @@ -88,7 +104,7 @@ export class MessageChannel extends TypedEventEmitter { public constructor( channelId: ChannelId, - senderId: SenderId, + senderId: ParticipantId, options: MessageChannelOptions = {}, localHistory: ILocalHistory = new MemLocalHistory() ) { @@ -109,6 +125,17 @@ export class MessageChannel extends TypedEventEmitter { options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD; this.timeReceived = new Map(); this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs; + + // Only construct RepairManager if repair is enabled (default: true) + if (options.enableRepair ?? true) { + this.repairManager = new RepairManager( + senderId, + options.repairConfig, + (event: string, detail: unknown) => { + this.safeSendEvent(event as MessageChannelEvent, { detail }); + } + ); + } } public static getMessageId(payload: Uint8Array): MessageId { @@ -272,9 +299,7 @@ export class MessageChannel extends TypedEventEmitter { ); const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => - !this.localHistory.some( - ({ messageId }) => messageId === messageHistoryEntry.messageId - ) + !this.isMessageAvailable(messageHistoryEntry.messageId) ); if (missingDependencies.length === 0) { if (isContentMessage(message) && this.deliverMessage(message)) { @@ -355,6 +380,44 @@ export class MessageChannel extends TypedEventEmitter { ); } + /** + * Sweep repair incoming buffer and rebroadcast messages ready for repair. + * Per SDS-R spec: periodically check for repair responses that are due. + * + * @param callback - callback to rebroadcast the message + * @returns Promise that resolves when all ready repairs have been sent + */ + public async sweepRepairIncomingBuffer( + callback?: (message: Message) => Promise + ): Promise { + const repairsToSend = + this.repairManager?.sweepIncomingBuffer(this.localHistory) ?? []; + + if (callback) { + for (const message of repairsToSend) { + try { + await callback(message); + log.info( + this.senderId, + "repair message rebroadcast", + message.messageId + ); + + // Emit RepairResponseSent event + this.safeSendEvent(MessageChannelEvent.RepairResponseSent, { + detail: { + messageId: message.messageId + } + }); + } catch (error) { + log.error("Failed to rebroadcast repair message:", error); + } + } + } + + return repairsToSend; + } + /** * Send a sync message to the SDS channel. * @@ -369,6 +432,12 @@ export class MessageChannel extends TypedEventEmitter { callback?: (message: SyncMessage) => Promise ): Promise { this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); + + // Get repair requests to include in sync message (SDS-R) + const repairRequests = + this.repairManager?.getRepairRequests(MAX_REPAIR_REQUESTS_PER_MESSAGE) ?? + []; + const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -376,18 +445,22 @@ export class MessageChannel extends TypedEventEmitter { this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId, retrievalHint }) => { - return { messageId, retrievalHint }; + .map(({ messageId, retrievalHint, senderId }) => { + return { messageId, retrievalHint, senderId }; }), this.lamportTimestamp, this.filter.toBytes(), - undefined + undefined, + repairRequests ); - if (!message.causalHistory || message.causalHistory.length === 0) { + if ( + (!message.causalHistory || message.causalHistory.length === 0) && + repairRequests.length === 0 + ) { log.info( this.senderId, - "no causal history in sync message, aborting sending" + "no causal history and no repair requests in sync message, aborting sending" ); return false; } @@ -399,6 +472,17 @@ export class MessageChannel extends TypedEventEmitter { this.safeSendEvent(MessageChannelEvent.OutSyncSent, { detail: message }); + + // Emit RepairRequestSent event if repair requests were included + if (repairRequests.length > 0) { + this.safeSendEvent(MessageChannelEvent.RepairRequestSent, { + detail: { + messageIds: repairRequests.map((r) => r.messageId), + carrierMessageId: message.messageId + } + }); + } + return true; } catch (error) { log.error( @@ -464,6 +548,26 @@ export class MessageChannel extends TypedEventEmitter { detail: message }); } + + // SDS-R: Handle received message in repair manager + this.repairManager?.markMessageReceived(message.messageId); + + // SDS-R: Process incoming repair requests + if (message.repairRequest && message.repairRequest.length > 0) { + // Emit RepairRequestReceived event + this.safeSendEvent(MessageChannelEvent.RepairRequestReceived, { + detail: { + messageIds: message.repairRequest.map((r) => r.messageId), + fromSenderId: message.senderId + } + }); + + this.repairManager?.processIncomingRepairRequests( + message.repairRequest, + this.localHistory + ); + } + this.reviewAckStatus(message); if (isContentMessage(message)) { this.filter.insert(message.messageId); @@ -471,9 +575,7 @@ export class MessageChannel extends TypedEventEmitter { const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => - !this.localHistory.some( - ({ messageId }) => messageId === messageHistoryEntry.messageId - ) + !this.isMessageAvailable(messageHistoryEntry.messageId) ); if (missingDependencies.length > 0) { @@ -487,6 +589,9 @@ export class MessageChannel extends TypedEventEmitter { missingDependencies.map((ch) => ch.messageId) ); + // SDS-R: Track missing dependencies in repair manager + this.repairManager?.markDependenciesMissing(missingDependencies); + this.safeSendEvent(MessageChannelEvent.InMessageMissing, { detail: Array.from(missingDependencies) }); @@ -549,18 +654,26 @@ export class MessageChannel extends TypedEventEmitter { // It's a new message if (!message) { log.info(this.senderId, "sending new message", messageId); + + // Get repair requests to include in the message (SDS-R) + const repairRequests = + this.repairManager?.getRepairRequests( + MAX_REPAIR_REQUESTS_PER_MESSAGE + ) ?? []; + message = new ContentMessage( messageId, this.channelId, this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId, retrievalHint }) => { - return { messageId, retrievalHint }; + .map(({ messageId, retrievalHint, senderId }) => { + return { messageId, retrievalHint, senderId }; }), this.lamportTimestamp, this.filter.toBytes(), - payload + payload, + repairRequests ); this.outgoingBuffer.push(message); @@ -616,6 +729,26 @@ export class MessageChannel extends TypedEventEmitter { } } + /** + * Check if a message is available (either in localHistory or incomingBuffer) + * This prevents treating messages as "missing" when they've already been received + * but are waiting in the incoming buffer for their dependencies. + * + * @param messageId - The ID of the message to check + * @private + */ + private isMessageAvailable(messageId: MessageId): boolean { + // Check if in local history + if (this.localHistory.some((m) => m.messageId === messageId)) { + return true; + } + // Check if in incoming buffer (already received, waiting for dependencies) + if (this.incomingBuffer.some((m) => m.messageId === messageId)) { + return true; + } + return false; + } + /** * Return true if the message was "delivered" * @@ -657,6 +790,7 @@ export class MessageChannel extends TypedEventEmitter { } this.localHistory.push(message); + return true; } diff --git a/packages/sds/src/message_channel/repair/buffers.spec.ts b/packages/sds/src/message_channel/repair/buffers.spec.ts new file mode 100644 index 0000000000..484d6118cf --- /dev/null +++ b/packages/sds/src/message_channel/repair/buffers.spec.ts @@ -0,0 +1,239 @@ +import { expect } from "chai"; + +import type { HistoryEntry } from "../message.js"; + +import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; + +describe("OutgoingRepairBuffer", () => { + let buffer: OutgoingRepairBuffer; + + beforeEach(() => { + buffer = new OutgoingRepairBuffer(3); // Small buffer for testing + }); + + it("should add entries and maintain sorted order", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + buffer.add(entry2, 2000); + buffer.add(entry1, 1000); + buffer.add(entry3, 3000); + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(3); + expect(items[0].tReq).to.equal(1000); + expect(items[1].tReq).to.equal(2000); + expect(items[2].tReq).to.equal(3000); + }); + + it("should not update T_req if message already exists", () => { + const entry: HistoryEntry = { messageId: "msg1" }; + + buffer.add(entry, 1000); + buffer.add(entry, 2000); // Try to add again with different T_req + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(1); + expect(items[0].tReq).to.equal(1000); // Should keep original + }); + + it("should evict furthest entry when buffer is full", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + const entry4: HistoryEntry = { messageId: "msg4" }; + + buffer.add(entry2, 2000); + buffer.add(entry1, 1000); + buffer.add(entry3, 3000); + buffer.add(entry4, 1500); // Should evict msg3 (furthest T_req = 3000) + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(3); + expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted (furthest T_req) + expect(buffer.has("msg1")).to.be.true; + expect(buffer.has("msg2")).to.be.true; + expect(buffer.has("msg4")).to.be.true; + }); + + it("should get eligible entries based on current time", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + buffer.add(entry3, 3000); + + const eligible = buffer.getEligible(1500, 3); + expect(eligible).to.have.lengthOf(1); + expect(eligible[0].messageId).to.equal("msg1"); + }); + + it("should get multiple eligible entries at later time", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + // Create new buffer for second test since getEligible marks entries as requested + const buffer2 = new OutgoingRepairBuffer(3); + buffer2.add(entry1, 1000); + buffer2.add(entry2, 2000); + buffer2.add(entry3, 3000); + + const eligible = buffer2.getEligible(2500, 3); + expect(eligible).to.have.lengthOf(2); + expect(eligible[0].messageId).to.equal("msg1"); + expect(eligible[1].messageId).to.equal("msg2"); + }); + + it("should respect maxRequests limit", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + buffer.add(entry3, 3000); + + const eligible = buffer.getEligible(5000, 2); // All are eligible but limit to 2 + expect(eligible).to.have.lengthOf(2); + expect(eligible[0].messageId).to.equal("msg1"); + expect(eligible[1].messageId).to.equal("msg2"); + }); + + it("should remove entries", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + + expect(buffer.size).to.equal(2); + buffer.remove("msg1"); + expect(buffer.size).to.equal(1); + expect(buffer.has("msg1")).to.be.false; + expect(buffer.has("msg2")).to.be.true; + }); + + it("should handle retrieval hint and sender_id", () => { + const hint = new Uint8Array([1, 2, 3]); + const entry: HistoryEntry = { + messageId: "msg1", + retrievalHint: hint, + senderId: "sender1" + }; + + buffer.add(entry, 1000); + const all = buffer.getAll(); + expect(all[0].retrievalHint).to.deep.equal(hint); + expect(all[0].senderId).to.equal("sender1"); + }); +}); + +describe("IncomingRepairBuffer", () => { + let buffer: IncomingRepairBuffer; + + beforeEach(() => { + buffer = new IncomingRepairBuffer(3); // Small buffer for testing + }); + + it("should add entries and maintain sorted order", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + buffer.add(entry2, 2000); + buffer.add(entry1, 1000); + buffer.add(entry3, 3000); + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(3); + expect(items[0].tResp).to.equal(1000); + expect(items[1].tResp).to.equal(2000); + expect(items[2].tResp).to.equal(3000); + }); + + it("should ignore duplicate entries", () => { + const entry: HistoryEntry = { messageId: "msg1" }; + + buffer.add(entry, 1000); + buffer.add(entry, 500); // Try to add again with earlier T_resp + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(1); + expect(items[0].tResp).to.equal(1000); // Should keep original + }); + + it("should evict furthest entry when buffer is full", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + const entry4: HistoryEntry = { messageId: "msg4" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + buffer.add(entry3, 3000); + buffer.add(entry4, 1500); // Should evict msg3 (furthest T_resp) + + const items = buffer.getItems(); + expect(items).to.have.lengthOf(3); + expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted + expect(buffer.has("msg1")).to.be.true; + expect(buffer.has("msg2")).to.be.true; + expect(buffer.has("msg4")).to.be.true; + }); + + it("should get and remove ready entries", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + buffer.add(entry3, 3000); + + const ready = buffer.getReady(1500); + expect(ready).to.have.lengthOf(1); + expect(ready[0].messageId).to.equal("msg1"); + + // Entry should be removed from buffer + expect(buffer.size).to.equal(2); + expect(buffer.has("msg1")).to.be.false; + + const ready2 = buffer.getReady(2500); + expect(ready2).to.have.lengthOf(1); + expect(ready2[0].messageId).to.equal("msg2"); + + expect(buffer.size).to.equal(1); + expect(buffer.has("msg2")).to.be.false; + expect(buffer.has("msg3")).to.be.true; + }); + + it("should remove entries", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + + expect(buffer.size).to.equal(2); + buffer.remove("msg1"); + expect(buffer.size).to.equal(1); + expect(buffer.has("msg1")).to.be.false; + expect(buffer.has("msg2")).to.be.true; + }); + + it("should clear all entries", () => { + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + + buffer.add(entry1, 1000); + buffer.add(entry2, 2000); + + expect(buffer.size).to.equal(2); + buffer.clear(); + expect(buffer.size).to.equal(0); + }); +}); diff --git a/packages/sds/src/message_channel/repair/buffers.ts b/packages/sds/src/message_channel/repair/buffers.ts new file mode 100644 index 0000000000..518fa7acb2 --- /dev/null +++ b/packages/sds/src/message_channel/repair/buffers.ts @@ -0,0 +1,277 @@ +import { Logger } from "@waku/utils"; + +import type { HistoryEntry, MessageId } from "../message.js"; + +const log = new Logger("sds:repair:buffers"); + +/** + * Entry in the outgoing repair buffer with request timing + */ +interface OutgoingBufferEntry { + entry: HistoryEntry; + tReq: number; // Timestamp when this repair request should be sent + requested: boolean; // Whether this repair has already been requested by the local node +} + +/** + * Entry in the incoming repair buffer with response timing + */ +interface IncomingBufferEntry { + entry: HistoryEntry; + tResp: number; // Timestamp when we should respond with this repair +} + +/** + * Buffer for outgoing repair requests (messages we need) + * Maintains a sorted array by T_req for efficient retrieval of eligible entries + */ +export class OutgoingRepairBuffer { + // Sorted array by T_req (ascending - earliest first) + private items: OutgoingBufferEntry[] = []; + private readonly maxSize: number; + + public constructor(maxSize = 1000) { + this.maxSize = maxSize; + } + + /** + * Add a missing message to the outgoing repair request buffer + * If message already exists, it is not updated (keeps original T_req) + * @returns true if the entry was added, false if it already existed + */ + public add(entry: HistoryEntry, tReq: number): boolean { + const messageId = entry.messageId; + + // Check if already exists - do NOT update T_req per spec + if (this.has(messageId)) { + log.info( + `Message ${messageId} already in outgoing buffer, keeping original T_req` + ); + return false; + } + + // Check buffer size limit + if (this.items.length >= this.maxSize) { + // Evict furthest T_req entry (last in sorted array) to preserve repairs that need to be sent the soonest + const evicted = this.items.pop()!; + log.warn( + `Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}` + ); + } + + // Add new entry and re-sort + const newEntry: OutgoingBufferEntry = { entry, tReq, requested: false }; + const combined = [...this.items, newEntry]; + + // Sort by T_req (ascending) + combined.sort((a, b) => a.tReq - b.tReq); + + this.items = combined; + log.info(`Added ${messageId} to outgoing buffer with T_req: ${tReq}`); + return true; + } + + /** + * Remove a message from the buffer (e.g., when received) + */ + public remove(messageId: MessageId): void { + this.items = this.items.filter( + (item) => item.entry.messageId !== messageId + ); + } + + /** + * Get eligible repair requests (where T_req <= currentTime) + * Returns up to maxRequests entries from the front of the sorted array + * Marks returned entries as requested but keeps them in buffer until received + */ + public getEligible( + currentTime: number = Date.now(), + maxRequests = 3 + ): HistoryEntry[] { + const eligible: HistoryEntry[] = []; + + // Iterate from front of sorted array (earliest T_req first) + for (const item of this.items) { + // Since array is sorted, once we hit an item with tReq > currentTime, + // all remaining items also have tReq > currentTime + if (item.tReq > currentTime) { + break; + } + + // Only return items that haven't been requested yet + if (!item.requested && eligible.length < maxRequests) { + eligible.push(item.entry); + // Mark as requested so we don't request it again + item.requested = true; + log.info( + `Repair request for ${item.entry.messageId} is eligible and marked as requested` + ); + } + + // If we've found enough eligible items, exit early + if (eligible.length >= maxRequests) { + break; + } + } + + return eligible; + } + + /** + * Check if a message is in the buffer + */ + public has(messageId: MessageId): boolean { + return this.items.some((item) => item.entry.messageId === messageId); + } + + /** + * Get the current buffer size + */ + public get size(): number { + return this.items.length; + } + + /** + * Clear all entries + */ + public clear(): void { + this.items = []; + } + + /** + * Get all entries (for testing/debugging) + */ + public getAll(): HistoryEntry[] { + return this.items.map((item) => item.entry); + } + + /** + * Get items array directly (for testing) + */ + public getItems(): OutgoingBufferEntry[] { + return [...this.items]; + } +} + +/** + * Buffer for incoming repair requests (repairs we need to send) + * Maintains a sorted array by T_resp for efficient retrieval of ready entries + */ +export class IncomingRepairBuffer { + // Sorted array by T_resp (ascending - earliest first) + private items: IncomingBufferEntry[] = []; + private readonly maxSize: number; + + public constructor(maxSize = 1000) { + this.maxSize = maxSize; + } + + /** + * Add a repair request that we can fulfill + * If message already exists, it is ignored (not updated) + * @returns true if the entry was added, false if it already existed + */ + public add(entry: HistoryEntry, tResp: number): boolean { + const messageId = entry.messageId; + + // Check if already exists - ignore per spec + if (this.has(messageId)) { + log.info(`Message ${messageId} already in incoming buffer, ignoring`); + return false; + } + + // Check buffer size limit + if (this.items.length >= this.maxSize) { + // Evict furthest T_resp entry (last in sorted array) + const evicted = this.items.pop()!; + log.warn( + `Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}` + ); + } + + // Add new entry and re-sort + const newEntry: IncomingBufferEntry = { entry, tResp }; + const combined = [...this.items, newEntry]; + + // Sort by T_resp (ascending) + combined.sort((a, b) => a.tResp - b.tResp); + + this.items = combined; + log.info(`Added ${messageId} to incoming buffer with T_resp: ${tResp}`); + return true; + } + + /** + * Remove a message from the buffer + */ + public remove(messageId: MessageId): void { + this.items = this.items.filter( + (item) => item.entry.messageId !== messageId + ); + } + + /** + * Get repairs ready to be sent (where T_resp <= currentTime) + * Removes and returns ready entries + */ + public getReady(currentTime: number): HistoryEntry[] { + // Find cutoff point - first item with tResp > currentTime + // Since array is sorted, all items before this are ready + let cutoff = 0; + for (let i = 0; i < this.items.length; i++) { + if (this.items[i].tResp > currentTime) { + cutoff = i; + break; + } + // If we reach the end, all items are ready + cutoff = i + 1; + } + + // Extract ready items and log them + const ready = this.items.slice(0, cutoff).map((item) => { + log.info(`Repair for ${item.entry.messageId} is ready to be sent`); + return item.entry; + }); + + // Keep only items after cutoff + this.items = this.items.slice(cutoff); + + return ready; + } + + /** + * Check if a message is in the buffer + */ + public has(messageId: MessageId): boolean { + return this.items.some((item) => item.entry.messageId === messageId); + } + + /** + * Get the current buffer size + */ + public get size(): number { + return this.items.length; + } + + /** + * Clear all entries + */ + public clear(): void { + this.items = []; + } + + /** + * Get all entries (for testing/debugging) + */ + public getAll(): HistoryEntry[] { + return this.items.map((item) => item.entry); + } + + /** + * Get items array directly (for testing) + */ + public getItems(): IncomingBufferEntry[] { + return [...this.items]; + } +} diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts new file mode 100644 index 0000000000..4207483165 --- /dev/null +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -0,0 +1,331 @@ +import { Logger } from "@waku/utils"; + +import type { HistoryEntry, MessageId } from "../message.js"; +import { Message } from "../message.js"; +import type { ILocalHistory } from "../message_channel.js"; + +import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; +import { + bigintToNumber, + calculateXorDistance, + combinedHash, + hashString, + ParticipantId +} from "./utils.js"; + +const log = new Logger("sds:repair:manager"); + +/** + * Per SDS-R spec: One response group per 128 participants + */ +const PARTICIPANTS_PER_RESPONSE_GROUP = 128; + +/** + * Event emitter callback for repair events + */ +export type RepairEventEmitter = (event: string, detail: unknown) => void; + +/** + * Configuration for SDS-R repair protocol + */ +export interface RepairConfig { + /** Minimum wait time before requesting repair (milliseconds) */ + tMin?: number; + /** Maximum wait time for repair window (milliseconds) */ + tMax?: number; + /** Number of response groups for load distribution */ + numResponseGroups?: number; + /** Maximum buffer size for repair requests */ + bufferSize?: number; +} + +/** + * Default configuration values based on spec recommendations + */ +export const DEFAULT_REPAIR_CONFIG: Required = { + tMin: 30000, // 30 seconds + tMax: 120000, // 120 seconds + numResponseGroups: 1, // Recommendation is 1 group per PARTICIPANTS_PER_RESPONSE_GROUP participants + bufferSize: 1000 +}; + +/** + * Manager for SDS-R repair protocol + * Handles repair request/response timing and coordination + */ +export class RepairManager { + private readonly participantId: ParticipantId; + private readonly config: Required; + private readonly outgoingBuffer: OutgoingRepairBuffer; + private readonly incomingBuffer: IncomingRepairBuffer; + private readonly eventEmitter?: RepairEventEmitter; + + public constructor( + participantId: ParticipantId, + config: RepairConfig = {}, + eventEmitter?: RepairEventEmitter + ) { + this.participantId = participantId; + this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; + this.eventEmitter = eventEmitter; + + this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); + this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); + + log.info(`RepairManager initialized for participant ${participantId}`); + } + + /** + * Calculate T_req - when to request repair for a missing message + * Per spec: T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min + */ + public calculateTReq(messageId: MessageId, currentTime = Date.now()): number { + const hash = combinedHash(this.participantId, messageId); + const range = BigInt(this.config.tMax - this.config.tMin); + const offset = bigintToNumber(hash % range) + this.config.tMin; + return currentTime + offset; + } + + /** + * Calculate T_resp - when to respond with a repair + * Per spec: T_resp = current_time + (distance * hash(message_id)) % T_max + * where distance = participant_id XOR sender_id + */ + public calculateTResp( + senderId: ParticipantId, + messageId: MessageId, + currentTime = Date.now() + ): number { + const distance = calculateXorDistance(this.participantId, senderId); + const messageHash = hashString(messageId); + const product = distance * messageHash; + const offset = bigintToNumber(product % BigInt(this.config.tMax)); + return currentTime + offset; + } + + /** + * Determine if this participant is in the response group for a message + * Per spec: (hash(participant_id, message_id) % num_response_groups) == + * (hash(sender_id, message_id) % num_response_groups) + */ + public isInResponseGroup( + senderId: ParticipantId, + messageId: MessageId + ): boolean { + if (!senderId) { + // Cannot determine response group without sender_id + return false; + } + + const numGroups = BigInt(this.config.numResponseGroups); + if (numGroups <= BigInt(1)) { + // Single group, everyone is in it + return true; + } + + const participantGroup = + combinedHash(this.participantId, messageId) % numGroups; + const senderGroup = combinedHash(senderId, messageId) % numGroups; + + return participantGroup === senderGroup; + } + + /** + * Handle missing dependencies by adding them to outgoing repair buffer + * Called when causal dependencies are detected as missing + */ + public markDependenciesMissing( + missingEntries: HistoryEntry[], + currentTime = Date.now() + ): void { + for (const entry of missingEntries) { + // Calculate when to request this repair + const tReq = this.calculateTReq(entry.messageId, currentTime); + + // Add to outgoing buffer - only log and emit event if actually added + const wasAdded = this.outgoingBuffer.add(entry, tReq); + + if (wasAdded) { + log.info( + `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` + ); + + // Emit event + this.eventEmitter?.("RepairRequestQueued", { + messageId: entry.messageId, + tReq + }); + } + } + } + + /** + * Handle receipt of a message - remove from repair buffers + * Called when a message is successfully received + */ + public markMessageReceived(messageId: MessageId): void { + // Remove from both buffers as we no longer need to request or respond + const wasInOutgoing = this.outgoingBuffer.has(messageId); + const wasInIncoming = this.incomingBuffer.has(messageId); + + if (wasInOutgoing) { + this.outgoingBuffer.remove(messageId); + log.info( + `Removed ${messageId} from outgoing repair buffer after receipt` + ); + } + + if (wasInIncoming) { + this.incomingBuffer.remove(messageId); + log.info( + `Removed ${messageId} from incoming repair buffer after receipt` + ); + } + } + + /** + * Get repair requests that are eligible to be sent + * Returns up to maxRequests entries where T_req <= currentTime + */ + public getRepairRequests( + maxRequests = 3, + currentTime = Date.now() + ): HistoryEntry[] { + return this.outgoingBuffer.getEligible(currentTime, maxRequests); + } + + /** + * Process incoming repair requests from other participants + * Adds to incoming buffer if we can fulfill and are in response group + */ + public processIncomingRepairRequests( + requests: HistoryEntry[], + localHistory: ILocalHistory, + currentTime = Date.now() + ): void { + for (const request of requests) { + // Remove from our own outgoing buffer (someone else is requesting it) + this.outgoingBuffer.remove(request.messageId); + + // Check if we have this message + const message = localHistory.find( + (m) => m.messageId === request.messageId + ); + if (!message) { + log.info( + `Cannot fulfill repair for ${request.messageId} - not in local history` + ); + continue; + } + + // Check if we're in the response group + if (!request.senderId) { + log.warn( + `Cannot determine response group for ${request.messageId} - missing sender_id` + ); + continue; + } + + if (!this.isInResponseGroup(request.senderId, request.messageId)) { + log.info(`Not in response group for ${request.messageId}`); + continue; + } + + // Calculate when to respond + const tResp = this.calculateTResp( + request.senderId, + request.messageId, + currentTime + ); + + // Add to incoming buffer - only log and emit event if actually added + const wasAdded = this.incomingBuffer.add(request, tResp); + + if (wasAdded) { + log.info( + `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` + ); + + // Emit event + this.eventEmitter?.("RepairResponseQueued", { + messageId: request.messageId, + tResp + }); + } + } + } + + /** + * Sweep outgoing buffer for repairs that should be requested + * Returns entries where T_req <= currentTime + */ + public sweepOutgoingBuffer( + maxRequests = 3, + currentTime = Date.now() + ): HistoryEntry[] { + return this.getRepairRequests(maxRequests, currentTime); + } + + /** + * Sweep incoming buffer for repairs ready to be sent + * Returns messages that should be rebroadcast + */ + public sweepIncomingBuffer( + localHistory: ILocalHistory, + currentTime = Date.now() + ): Message[] { + const ready = this.incomingBuffer.getReady(currentTime); + const messages: Message[] = []; + + for (const entry of ready) { + const message = localHistory.find((m) => m.messageId === entry.messageId); + if (message) { + messages.push(message); + log.info(`Sending repair for ${entry.messageId}`); + } else { + log.warn(`Message ${entry.messageId} no longer in local history`); + } + } + + return messages; + } + + /** + * Clear all buffers + */ + public clear(): void { + this.outgoingBuffer.clear(); + this.incomingBuffer.clear(); + } + + /** + * Update number of response groups (e.g., when participants change) + */ + public updateResponseGroups(numParticipants: number): void { + if ( + numParticipants < 0 || + !Number.isFinite(numParticipants) || + !Number.isInteger(numParticipants) + ) { + throw new Error( + `Invalid numParticipants: ${numParticipants}. Must be a positive integer.` + ); + } + + if (numParticipants > Number.MAX_SAFE_INTEGER) { + log.warn( + `numParticipants ${numParticipants} exceeds MAX_SAFE_INTEGER, using MAX_SAFE_INTEGER` + ); + numParticipants = Number.MAX_SAFE_INTEGER; + } + + // Per spec: num_response_groups = max(1, num_participants / PARTICIPANTS_PER_RESPONSE_GROUP) + this.config.numResponseGroups = Math.max( + 1, + Math.floor(numParticipants / PARTICIPANTS_PER_RESPONSE_GROUP) + ); + log.info( + `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` + ); + } +} diff --git a/packages/sds/src/message_channel/repair/utils.ts b/packages/sds/src/message_channel/repair/utils.ts new file mode 100644 index 0000000000..4206857b2d --- /dev/null +++ b/packages/sds/src/message_channel/repair/utils.ts @@ -0,0 +1,80 @@ +import { sha256 } from "@noble/hashes/sha2"; +import { bytesToHex } from "@waku/utils/bytes"; + +import type { MessageId } from "../message.js"; + +/** + * ParticipantId can be a string or converted to a numeric representation for XOR operations + */ +export type ParticipantId = string; + +/** + * Compute SHA256 hash and convert to integer for modulo operations + * Uses first 8 bytes of hash for the integer conversion + */ +export function hashToInteger(input: string): bigint { + const hashBytes = sha256(new TextEncoder().encode(input)); + // Use first 8 bytes for a 64-bit integer + const view = new DataView(hashBytes.buffer, 0, 8); + return view.getBigUint64(0, false); // big-endian +} + +/** + * Compute combined hash for (participantId, messageId) and convert to integer + * This is used for T_req calculations and response group membership + */ +export function combinedHash( + participantId: ParticipantId, + messageId: MessageId +): bigint { + const combined = `${participantId}${messageId}`; + return hashToInteger(combined); +} + +/** + * Convert ParticipantId to numeric representation for XOR operations + * TODO: Not per spec, further review needed + * The spec assumes participant IDs support XOR natively, but we're using + * SHA256 hash to ensure consistent numeric representation for string IDs + */ +export function participantIdToNumeric(participantId: ParticipantId): bigint { + return hashToInteger(participantId); +} + +/** + * Calculate XOR distance between two participant IDs + * Used for T_resp calculations where distance affects response timing + */ +export function calculateXorDistance( + participantId1: ParticipantId, + participantId2: ParticipantId +): bigint { + const numeric1 = participantIdToNumeric(participantId1); + const numeric2 = participantIdToNumeric(participantId2); + return numeric1 ^ numeric2; +} + +/** + * Helper to convert bigint to number for timing calculations + * Ensures the result fits in JavaScript's number range + */ +export function bigintToNumber(value: bigint): number { + // For timing calculations, we modulo by MAX_SAFE_INTEGER to ensure it fits + const maxSafe = BigInt(Number.MAX_SAFE_INTEGER); + return Number(value % maxSafe); +} + +/** + * Calculate hash for a single string (used for message_id in T_resp) + */ +export function hashString(input: string): bigint { + return hashToInteger(input); +} + +/** + * Convert a hash result to hex string for debugging/logging + */ +export function hashToHex(input: string): string { + const hashBytes = sha256(new TextEncoder().encode(input)); + return bytesToHex(hashBytes); +}