@@ -36,10 +36,13 @@ import { RetryManager } from "./retry_manager.js";
3636const log = new Logger ( "sdk:reliable-channel" ) ;
3737
3838const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000 ; // 30 seconds
39+ const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000 ; // 10 seconds when repairs pending
3940const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000 ; // 30 seconds
4041const DEFAULT_MAX_RETRY_ATTEMPTS = 10 ;
4142const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000 ;
43+ const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000 ; // 10 seconds
4244const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000 ;
45+ const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000 ; // 2 minutes
4346
4447const IRRECOVERABLE_SENDING_ERRORS : LightPushError [ ] = [
4548 LightPushError . ENCODE_FAILED ,
@@ -78,6 +81,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {
7881
7982 /**
8083 * How often store queries are done to retrieve missing messages.
84+ * Only applies when retrievalStrategy includes Store ('both' or 'store-only').
8185 *
8286 * @default 10,000 (10 seconds)
8387 */
@@ -111,6 +115,25 @@ export type ReliableChannelOptions = MessageChannelOptions & {
111115 * @default 1000 (1 second)
112116 */
113117 processTaskMinElapseMs ?: number ;
118+
119+ /**
120+ * Strategy for retrieving missing messages.
121+ * - 'both': Use SDS-R peer repair and Store queries (default)
122+ * - 'sds-r-only': Only use SDS-R peer repair
123+ * - 'store-only': Only use Store queries (legacy behavior)
124+ * - 'none': No automatic retrieval
125+ *
126+ * @default 'both'
127+ */
128+ retrievalStrategy ?: "both" | "sds-r-only" | "store-only" | "none" ;
129+
130+ /**
131+ * How long to wait for SDS-R repair before falling back to Store.
132+ * Only applies when retrievalStrategy is 'both'.
133+ *
134+ * @default 120,000 (2 minutes - matches SDS-R T_max)
135+ */
136+ sdsrFallbackTimeoutMs ?: number ;
114137} ;
115138
116139/**
@@ -145,11 +168,22 @@ export class ReliableChannel<
145168 private syncTimeout : ReturnType < typeof setTimeout > | undefined ;
146169 private sweepInBufInterval : ReturnType < typeof setInterval > | undefined ;
147170 private readonly sweepInBufIntervalMs : number ;
171+ private sweepRepairInterval : ReturnType < typeof setInterval > | undefined ;
148172 private processTaskTimeout : ReturnType < typeof setTimeout > | undefined ;
149173 private readonly retryManager : RetryManager | undefined ;
150174 private readonly missingMessageRetriever ?: MissingMessageRetriever < T > ;
151175 private readonly queryOnConnect ?: QueryOnConnect < T > ;
152176 private readonly processTaskMinElapseMs : number ;
177+ private readonly retrievalStrategy :
178+ | "both"
179+ | "sds-r-only"
180+ | "store-only"
181+ | "none" ;
182+ private readonly sdsrFallbackTimeoutMs : number ;
183+ private readonly missingMessageTimeouts : Map <
184+ string ,
185+ ReturnType < typeof setTimeout >
186+ > ;
153187 private _started : boolean ;
154188
155189 private constructor (
@@ -214,7 +248,13 @@ export class ReliableChannel<
214248 this . processTaskMinElapseMs =
215249 options ?. processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS ;
216250
217- if ( this . _retrieve ) {
251+ this . retrievalStrategy = options ?. retrievalStrategy ?? "both" ;
252+ this . sdsrFallbackTimeoutMs =
253+ options ?. sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS ;
254+ this . missingMessageTimeouts = new Map ( ) ;
255+
256+ // Only enable Store retrieval based on strategy
257+ if ( this . _retrieve && this . shouldUseStore ( ) ) {
218258 this . missingMessageRetriever = new MissingMessageRetriever (
219259 this . decoder ,
220260 options ?. retrieveFrequencyMs ,
@@ -418,6 +458,13 @@ export class ReliableChannel<
418458 // missing messages or the status of previous outgoing messages
419459 this . messageChannel . pushIncomingMessage ( sdsMessage , retrievalHint ) ;
420460
461+ // Cancel Store fallback timeout if message was retrieved
462+ const timeout = this . missingMessageTimeouts . get ( sdsMessage . messageId ) ;
463+ if ( timeout ) {
464+ clearTimeout ( timeout ) ;
465+ this . missingMessageTimeouts . delete ( sdsMessage . messageId ) ;
466+ }
467+
421468 this . missingMessageRetriever ?. removeMissingMessage ( sdsMessage . messageId ) ;
422469
423470 if ( sdsMessage . content && sdsMessage . content . length > 0 ) {
@@ -478,6 +525,12 @@ export class ReliableChannel<
478525 this . setupEventListeners ( ) ;
479526 this . restartSync ( ) ;
480527 this . startSweepIncomingBufferLoop ( ) ;
528+
529+ // Only start repair sweep if SDS-R is enabled
530+ if ( this . shouldUseSdsR ( ) ) {
531+ this . startRepairSweepLoop ( ) ;
532+ }
533+
481534 if ( this . _retrieve ) {
482535 this . missingMessageRetriever ?. start ( ) ;
483536 this . queryOnConnect ?. start ( ) ;
@@ -490,6 +543,8 @@ export class ReliableChannel<
490543 this . _started = false ;
491544 this . stopSync ( ) ;
492545 this . stopSweepIncomingBufferLoop ( ) ;
546+ this . stopRepairSweepLoop ( ) ;
547+ this . clearMissingMessageTimeouts ( ) ;
493548 this . missingMessageRetriever ?. stop ( ) ;
494549 this . queryOnConnect ?. stop ( ) ;
495550 // TODO unsubscribe
@@ -512,18 +567,67 @@ export class ReliableChannel<
512567 if ( this . sweepInBufInterval ) clearInterval ( this . sweepInBufInterval ) ;
513568 }
514569
570+ private startRepairSweepLoop ( ) : void {
571+ this . stopRepairSweepLoop ( ) ;
572+ this . sweepRepairInterval = setInterval ( ( ) => {
573+ void this . messageChannel
574+ . sweepRepairIncomingBuffer ( async ( message ) => {
575+ // Rebroadcast the repair message
576+ const wakuMessage = { payload : message . encode ( ) } ;
577+ const result = await this . _send ( this . encoder , wakuMessage ) ;
578+ return result . failures . length === 0 ;
579+ } )
580+ . catch ( ( err ) => {
581+ log . error ( "error encountered when sweeping repair buffer" , err ) ;
582+ } ) ;
583+ } , DEFAULT_SWEEP_REPAIR_INTERVAL_MS ) ;
584+ }
585+
586+ private stopRepairSweepLoop ( ) : void {
587+ if ( this . sweepRepairInterval ) clearInterval ( this . sweepRepairInterval ) ;
588+ }
589+
590+ private clearMissingMessageTimeouts ( ) : void {
591+ for ( const timeout of this . missingMessageTimeouts . values ( ) ) {
592+ clearTimeout ( timeout ) ;
593+ }
594+ this . missingMessageTimeouts . clear ( ) ;
595+ }
596+
597+ private shouldUseStore ( ) : boolean {
598+ return (
599+ this . retrievalStrategy === "both" ||
600+ this . retrievalStrategy === "store-only"
601+ ) ;
602+ }
603+
604+ private shouldUseSdsR ( ) : boolean {
605+ return (
606+ this . retrievalStrategy === "both" ||
607+ this . retrievalStrategy === "sds-r-only"
608+ ) ;
609+ }
610+
515611 private restartSync ( multiplier : number = 1 ) : void {
516612 if ( this . syncTimeout ) {
517613 clearTimeout ( this . syncTimeout ) ;
518614 }
519615 if ( this . syncMinIntervalMs ) {
520- const timeoutMs = this . random ( ) * this . syncMinIntervalMs * multiplier ;
616+ // Adaptive sync: use shorter interval when repairs are pending
617+ const hasPendingRepairs =
618+ this . shouldUseSdsR ( ) && this . messageChannel . hasPendingRepairRequests ( ) ;
619+ const baseInterval = hasPendingRepairs
620+ ? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS
621+ : this . syncMinIntervalMs ;
622+
623+ const timeoutMs = this . random ( ) * baseInterval * multiplier ;
521624
522625 this . syncTimeout = setTimeout ( ( ) => {
523626 void this . sendSyncMessage ( ) ;
524627 // Always restart a sync, no matter whether the message was sent.
525- // Set a multiplier so we wait a bit longer to not hog the conversation
526- void this . restartSync ( 2 ) ;
628+ // Use smaller multiplier when repairs pending to send more frequently
629+ const nextMultiplier = hasPendingRepairs ? 1.2 : 2 ;
630+ void this . restartSync ( nextMultiplier ) ;
527631 } , timeoutMs ) ;
528632 }
529633 }
@@ -669,12 +773,35 @@ export class ReliableChannel<
669773 MessageChannelEvent . InMessageMissing ,
670774 ( event ) => {
671775 for ( const { messageId, retrievalHint } of event . detail ) {
672- if ( retrievalHint && this . missingMessageRetriever ) {
673- this . missingMessageRetriever . addMissingMessage (
674- messageId ,
675- retrievalHint
676- ) ;
776+ // Coordinated retrieval strategy
777+ if ( this . retrievalStrategy === "both" ) {
778+ // SDS-R will attempt first, schedule Store fallback
779+ // Note: missingMessageRetriever only exists if Store protocol is available
780+ if ( retrievalHint && this . missingMessageRetriever ) {
781+ const timeout = setTimeout ( ( ) => {
782+ // Still missing after SDS-R timeout, try Store
783+ log . info (
784+ `Message ${ messageId } not retrieved via SDS-R, falling back to Store`
785+ ) ;
786+ this . missingMessageRetriever ?. addMissingMessage (
787+ messageId ,
788+ retrievalHint
789+ ) ;
790+ this . missingMessageTimeouts . delete ( messageId ) ;
791+ } , this . sdsrFallbackTimeoutMs ) ;
792+
793+ this . missingMessageTimeouts . set ( messageId , timeout ) ;
794+ }
795+ } else if ( this . retrievalStrategy === "store-only" ) {
796+ // Immediate Store retrieval
797+ if ( retrievalHint && this . missingMessageRetriever ) {
798+ this . missingMessageRetriever . addMissingMessage (
799+ messageId ,
800+ retrievalHint
801+ ) ;
802+ }
677803 }
804+ // For 'sds-r-only' and 'none', SDS-R handles it or nothing happens
678805 }
679806 }
680807 ) ;
0 commit comments