diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..aa975fa9af 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -18,8 +18,8 @@ import { MessageChannel, MessageChannelEvent, type MessageChannelOptions, + type ParticipantId, Message as SdsMessage, - type SenderId, SyncMessage } from "@waku/sds"; import { Logger } from "@waku/utils"; @@ -36,9 +36,11 @@ import { RetryManager } from "./retry_manager.js"; const log = new Logger("sdk:reliable-channel"); const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds +const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds const DEFAULT_MAX_RETRY_ATTEMPTS = 10; const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; +const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ @@ -78,6 +80,7 @@ export type ReliableChannelOptions = MessageChannelOptions & { /** * How often store queries are done to retrieve missing messages. + * Only applies when retrievalStrategy includes Store ('both' or 'store-only'). * * @default 10,000 (10 seconds) */ @@ -111,6 +114,17 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @default 1000 (1 second) */ processTaskMinElapseMs?: number; + + /** + * Strategy for retrieving missing messages. + * - 'both': Use SDS-R peer repair and Store queries in parallel (default) + * - 'sds-r-only': Only use SDS-R peer repair + * - 'store-only': Only use Store queries (legacy behavior) + * - 'none': No automatic retrieval + * + * @default 'both' + */ + retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none"; }; /** @@ -145,11 +159,17 @@ export class ReliableChannel< private syncTimeout: ReturnType | undefined; private sweepInBufInterval: ReturnType | undefined; private readonly sweepInBufIntervalMs: number; + private sweepRepairInterval: ReturnType | undefined; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; private readonly missingMessageRetriever?: MissingMessageRetriever; private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; + private readonly retrievalStrategy: + | "both" + | "sds-r-only" + | "store-only" + | "none"; private _started: boolean; private constructor( @@ -214,7 +234,10 @@ export class ReliableChannel< this.processTaskMinElapseMs = options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; - if (this._retrieve) { + this.retrievalStrategy = options?.retrievalStrategy ?? "both"; + + // Only enable Store retrieval based on strategy + if (this._retrieve && this.shouldUseStore()) { this.missingMessageRetriever = new MissingMessageRetriever( this.decoder, options?.retrieveFrequencyMs, @@ -264,12 +287,20 @@ export class ReliableChannel< public static async create( node: IWaku, channelId: ChannelId, - senderId: SenderId, + senderId: ParticipantId, encoder: IEncoder, decoder: IDecoder, options?: ReliableChannelOptions ): Promise> { - const sdsMessageChannel = new MessageChannel(channelId, senderId, options); + // Enable SDS-R repair only if retrieval strategy uses it + const retrievalStrategy = options?.retrievalStrategy ?? "both"; + const enableRepair = + retrievalStrategy === "both" || retrievalStrategy === "sds-r-only"; + + const sdsMessageChannel = new MessageChannel(channelId, senderId, { + ...options, + enableRepair + }); const messageChannel = new ReliableChannel( node, sdsMessageChannel, @@ -418,6 +449,7 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); + // Remove from Store retriever if message was retrieved this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { @@ -478,6 +510,12 @@ export class ReliableChannel< this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); + + // Only start repair sweep if SDS-R is enabled + if (this.shouldUseSdsR()) { + this.startRepairSweepLoop(); + } + if (this._retrieve) { this.missingMessageRetriever?.start(); this.queryOnConnect?.start(); @@ -490,6 +528,7 @@ export class ReliableChannel< this._started = false; this.stopSync(); this.stopSweepIncomingBufferLoop(); + this.stopRepairSweepLoop(); this.missingMessageRetriever?.stop(); this.queryOnConnect?.stop(); // TODO unsubscribe @@ -512,18 +551,60 @@ export class ReliableChannel< if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval); } + private startRepairSweepLoop(): void { + this.stopRepairSweepLoop(); + this.sweepRepairInterval = setInterval(() => { + void this.messageChannel + .sweepRepairIncomingBuffer(async (message) => { + // Rebroadcast the repair message + const wakuMessage = { payload: message.encode() }; + const result = await this._send(this.encoder, wakuMessage); + return result.failures.length === 0; + }) + .catch((err) => { + log.error("error encountered when sweeping repair buffer", err); + }); + }, DEFAULT_SWEEP_REPAIR_INTERVAL_MS); + } + + private stopRepairSweepLoop(): void { + if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval); + } + + private shouldUseStore(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "store-only" + ); + } + + private shouldUseSdsR(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "sds-r-only" + ); + } + private restartSync(multiplier: number = 1): void { if (this.syncTimeout) { clearTimeout(this.syncTimeout); } if (this.syncMinIntervalMs) { - const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier; + // Adaptive sync: use shorter interval when repairs are pending + const hasPendingRepairs = + this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests(); + const baseInterval = hasPendingRepairs + ? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS + : this.syncMinIntervalMs; + + const timeoutMs = this.random() * baseInterval * multiplier; this.syncTimeout = setTimeout(() => { void this.sendSyncMessage(); // Always restart a sync, no matter whether the message was sent. - // Set a multiplier so we wait a bit longer to not hog the conversation - void this.restartSync(2); + // Use smaller multiplier when repairs pending to send more frequently + const nextMultiplier = hasPendingRepairs ? 1.2 : 2; + void this.restartSync(nextMultiplier); }, timeoutMs); } } @@ -669,7 +750,13 @@ export class ReliableChannel< MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { - if (retrievalHint && this.missingMessageRetriever) { + // Store retrieval (for 'both' and 'store-only' strategies) + // SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only' + if ( + this.shouldUseStore() && + retrievalHint && + this.missingMessageRetriever + ) { this.missingMessageRetriever.addMissingMessage( messageId, retrievalHint diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index ecc2a55edc..aa088d720b 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -12,10 +12,8 @@ export enum MessageChannelEvent { InMessageLost = "sds:in:message-irretrievably-lost", ErrorTask = "sds:error-task", // SDS-R Repair Events - RepairRequestQueued = "sds:repair:request-queued", RepairRequestSent = "sds:repair:request-sent", RepairRequestReceived = "sds:repair:request-received", - RepairResponseQueued = "sds:repair:response-queued", RepairResponseSent = "sds:repair:response-sent" } @@ -33,10 +31,6 @@ export type MessageChannelEvents = { [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; [MessageChannelEvent.ErrorTask]: CustomEvent; - [MessageChannelEvent.RepairRequestQueued]: CustomEvent<{ - messageId: MessageId; - tReq: number; - }>; [MessageChannelEvent.RepairRequestSent]: CustomEvent<{ messageIds: MessageId[]; carrierMessageId: MessageId; @@ -45,10 +39,6 @@ export type MessageChannelEvents = { messageIds: MessageId[]; fromSenderId?: ParticipantId; }>; - [MessageChannelEvent.RepairResponseQueued]: CustomEvent<{ - messageId: MessageId; - tResp: number; - }>; [MessageChannelEvent.RepairResponseSent]: CustomEvent<{ messageId: MessageId; }>; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index f72ba52579..f554e31f5f 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -128,13 +128,7 @@ export class MessageChannel extends TypedEventEmitter { // Only construct RepairManager if repair is enabled (default: true) if (options.enableRepair ?? true) { - this.repairManager = new RepairManager( - senderId, - options.repairConfig, - (event: string, detail: unknown) => { - this.safeSendEvent(event as MessageChannelEvent, { detail }); - } - ); + this.repairManager = new RepairManager(senderId, options.repairConfig); } } @@ -142,6 +136,14 @@ export class MessageChannel extends TypedEventEmitter { return bytesToHex(sha256(payload)); } + /** + * Check if there are pending repair requests that need to be sent. + * Useful for adaptive sync intervals - increase frequency when repairs pending. + */ + public hasPendingRepairRequests(currentTime = Date.now()): boolean { + return this.repairManager?.hasRequestsReady(currentTime) ?? false; + } + /** * Processes all queued tasks sequentially to ensure proper message ordering. * diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 4207483165..1a8a85f43e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager"); */ const PARTICIPANTS_PER_RESPONSE_GROUP = 128; -/** - * Event emitter callback for repair events - */ -export type RepairEventEmitter = (event: string, detail: unknown) => void; - /** * Configuration for SDS-R repair protocol */ @@ -58,16 +53,10 @@ export class RepairManager { private readonly config: Required; private readonly outgoingBuffer: OutgoingRepairBuffer; private readonly incomingBuffer: IncomingRepairBuffer; - private readonly eventEmitter?: RepairEventEmitter; - public constructor( - participantId: ParticipantId, - config: RepairConfig = {}, - eventEmitter?: RepairEventEmitter - ) { + public constructor(participantId: ParticipantId, config: RepairConfig = {}) { this.participantId = participantId; this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; - this.eventEmitter = eventEmitter; this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); @@ -142,19 +131,13 @@ export class RepairManager { // Calculate when to request this repair const tReq = this.calculateTReq(entry.messageId, currentTime); - // Add to outgoing buffer - only log and emit event if actually added + // Add to outgoing buffer - only log if actually added const wasAdded = this.outgoingBuffer.add(entry, tReq); if (wasAdded) { log.info( `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` ); - - // Emit event - this.eventEmitter?.("RepairRequestQueued", { - messageId: entry.messageId, - tReq - }); } } } @@ -238,19 +221,13 @@ export class RepairManager { currentTime ); - // Add to incoming buffer - only log and emit event if actually added + // Add to incoming buffer - only log if actually added const wasAdded = this.incomingBuffer.add(request, tResp); if (wasAdded) { log.info( `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` ); - - // Emit event - this.eventEmitter?.("RepairResponseQueued", { - messageId: request.messageId, - tResp - }); } } } @@ -328,4 +305,12 @@ export class RepairManager { `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` ); } + + /** + * Check if there are repair requests ready to be sent + */ + public hasRequestsReady(currentTime = Date.now()): boolean { + const items = this.outgoingBuffer.getItems(); + return items.length > 0 && items[0].tReq <= currentTime; + } }