Skip to content

Commit fd357f9

Browse files
committed
feat: integrate sds-r within reliable channels SDK
1 parent 62aae51 commit fd357f9

File tree

3 files changed

+229
-9
lines changed

3 files changed

+229
-9
lines changed

packages/sdk/src/reliable_channel/reliable_channel.ts

Lines changed: 136 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ import { RetryManager } from "./retry_manager.js";
3636
const log = new Logger("sdk:reliable-channel");
3737

3838
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
39+
const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending
3940
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
4041
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
4142
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
43+
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
4244
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
45+
const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes
4346

4447
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
4548
LightPushError.ENCODE_FAILED,
@@ -78,6 +81,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {
7881

7982
/**
8083
* How often store queries are done to retrieve missing messages.
84+
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
8185
*
8286
* @default 10,000 (10 seconds)
8387
*/
@@ -111,6 +115,25 @@ export type ReliableChannelOptions = MessageChannelOptions & {
111115
* @default 1000 (1 second)
112116
*/
113117
processTaskMinElapseMs?: number;
118+
119+
/**
120+
* Strategy for retrieving missing messages.
121+
* - 'both': Use SDS-R peer repair and Store queries (default)
122+
* - 'sds-r-only': Only use SDS-R peer repair
123+
* - 'store-only': Only use Store queries (legacy behavior)
124+
* - 'none': No automatic retrieval
125+
*
126+
* @default 'both'
127+
*/
128+
retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none";
129+
130+
/**
131+
* How long to wait for SDS-R repair before falling back to Store.
132+
* Only applies when retrievalStrategy is 'both'.
133+
*
134+
* @default 120,000 (2 minutes - matches SDS-R T_max)
135+
*/
136+
sdsrFallbackTimeoutMs?: number;
114137
};
115138

116139
/**
@@ -145,11 +168,22 @@ export class ReliableChannel<
145168
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
146169
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
147170
private readonly sweepInBufIntervalMs: number;
171+
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
148172
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
149173
private readonly retryManager: RetryManager | undefined;
150174
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
151175
private readonly queryOnConnect?: QueryOnConnect<T>;
152176
private readonly processTaskMinElapseMs: number;
177+
private readonly retrievalStrategy:
178+
| "both"
179+
| "sds-r-only"
180+
| "store-only"
181+
| "none";
182+
private readonly sdsrFallbackTimeoutMs: number;
183+
private readonly missingMessageTimeouts: Map<
184+
string,
185+
ReturnType<typeof setTimeout>
186+
>;
153187
private _started: boolean;
154188

155189
private constructor(
@@ -214,7 +248,13 @@ export class ReliableChannel<
214248
this.processTaskMinElapseMs =
215249
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
216250

217-
if (this._retrieve) {
251+
this.retrievalStrategy = options?.retrievalStrategy ?? "both";
252+
this.sdsrFallbackTimeoutMs =
253+
options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS;
254+
this.missingMessageTimeouts = new Map();
255+
256+
// Only enable Store retrieval based on strategy
257+
if (this._retrieve && this.shouldUseStore()) {
218258
this.missingMessageRetriever = new MissingMessageRetriever(
219259
this.decoder,
220260
options?.retrieveFrequencyMs,
@@ -418,6 +458,13 @@ export class ReliableChannel<
418458
// missing messages or the status of previous outgoing messages
419459
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
420460

461+
// Cancel Store fallback timeout if message was retrieved
462+
const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId);
463+
if (timeout) {
464+
clearTimeout(timeout);
465+
this.missingMessageTimeouts.delete(sdsMessage.messageId);
466+
}
467+
421468
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
422469

423470
if (sdsMessage.content && sdsMessage.content.length > 0) {
@@ -478,6 +525,12 @@ export class ReliableChannel<
478525
this.setupEventListeners();
479526
this.restartSync();
480527
this.startSweepIncomingBufferLoop();
528+
529+
// Only start repair sweep if SDS-R is enabled
530+
if (this.shouldUseSdsR()) {
531+
this.startRepairSweepLoop();
532+
}
533+
481534
if (this._retrieve) {
482535
this.missingMessageRetriever?.start();
483536
this.queryOnConnect?.start();
@@ -490,6 +543,8 @@ export class ReliableChannel<
490543
this._started = false;
491544
this.stopSync();
492545
this.stopSweepIncomingBufferLoop();
546+
this.stopRepairSweepLoop();
547+
this.clearMissingMessageTimeouts();
493548
this.missingMessageRetriever?.stop();
494549
this.queryOnConnect?.stop();
495550
// TODO unsubscribe
@@ -512,18 +567,67 @@ export class ReliableChannel<
512567
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
513568
}
514569

570+
private startRepairSweepLoop(): void {
571+
this.stopRepairSweepLoop();
572+
this.sweepRepairInterval = setInterval(() => {
573+
void this.messageChannel
574+
.sweepRepairIncomingBuffer(async (message) => {
575+
// Rebroadcast the repair message
576+
const wakuMessage = { payload: message.encode() };
577+
const result = await this._send(this.encoder, wakuMessage);
578+
return result.failures.length === 0;
579+
})
580+
.catch((err) => {
581+
log.error("error encountered when sweeping repair buffer", err);
582+
});
583+
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
584+
}
585+
586+
private stopRepairSweepLoop(): void {
587+
if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval);
588+
}
589+
590+
private clearMissingMessageTimeouts(): void {
591+
for (const timeout of this.missingMessageTimeouts.values()) {
592+
clearTimeout(timeout);
593+
}
594+
this.missingMessageTimeouts.clear();
595+
}
596+
597+
private shouldUseStore(): boolean {
598+
return (
599+
this.retrievalStrategy === "both" ||
600+
this.retrievalStrategy === "store-only"
601+
);
602+
}
603+
604+
private shouldUseSdsR(): boolean {
605+
return (
606+
this.retrievalStrategy === "both" ||
607+
this.retrievalStrategy === "sds-r-only"
608+
);
609+
}
610+
515611
private restartSync(multiplier: number = 1): void {
516612
if (this.syncTimeout) {
517613
clearTimeout(this.syncTimeout);
518614
}
519615
if (this.syncMinIntervalMs) {
520-
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
616+
// Adaptive sync: use shorter interval when repairs are pending
617+
const hasPendingRepairs =
618+
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
619+
const baseInterval = hasPendingRepairs
620+
? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS
621+
: this.syncMinIntervalMs;
622+
623+
const timeoutMs = this.random() * baseInterval * multiplier;
521624

522625
this.syncTimeout = setTimeout(() => {
523626
void this.sendSyncMessage();
524627
// Always restart a sync, no matter whether the message was sent.
525-
// Set a multiplier so we wait a bit longer to not hog the conversation
526-
void this.restartSync(2);
628+
// Use smaller multiplier when repairs pending to send more frequently
629+
const nextMultiplier = hasPendingRepairs ? 1.2 : 2;
630+
void this.restartSync(nextMultiplier);
527631
}, timeoutMs);
528632
}
529633
}
@@ -669,12 +773,35 @@ export class ReliableChannel<
669773
MessageChannelEvent.InMessageMissing,
670774
(event) => {
671775
for (const { messageId, retrievalHint } of event.detail) {
672-
if (retrievalHint && this.missingMessageRetriever) {
673-
this.missingMessageRetriever.addMissingMessage(
674-
messageId,
675-
retrievalHint
676-
);
776+
// Coordinated retrieval strategy
777+
if (this.retrievalStrategy === "both") {
778+
// SDS-R will attempt first, schedule Store fallback
779+
// Note: missingMessageRetriever only exists if Store protocol is available
780+
if (retrievalHint && this.missingMessageRetriever) {
781+
const timeout = setTimeout(() => {
782+
// Still missing after SDS-R timeout, try Store
783+
log.info(
784+
`Message ${messageId} not retrieved via SDS-R, falling back to Store`
785+
);
786+
this.missingMessageRetriever?.addMissingMessage(
787+
messageId,
788+
retrievalHint
789+
);
790+
this.missingMessageTimeouts.delete(messageId);
791+
}, this.sdsrFallbackTimeoutMs);
792+
793+
this.missingMessageTimeouts.set(messageId, timeout);
794+
}
795+
} else if (this.retrievalStrategy === "store-only") {
796+
// Immediate Store retrieval
797+
if (retrievalHint && this.missingMessageRetriever) {
798+
this.missingMessageRetriever.addMissingMessage(
799+
messageId,
800+
retrievalHint
801+
);
802+
}
677803
}
804+
// For 'sds-r-only' and 'none', SDS-R handles it or nothing happens
678805
}
679806
}
680807
);

packages/sds/src/message_channel/message_channel.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,31 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
142142
return bytesToHex(sha256(payload));
143143
}
144144

145+
/**
146+
* Check if there are pending repair requests that need to be sent.
147+
* Useful for adaptive sync intervals - increase frequency when repairs pending.
148+
*/
149+
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
150+
if (!this.repairManager.isEnabled) {
151+
return false;
152+
}
153+
154+
const nextRequestTime = this.repairManager.getNextRequestTime();
155+
return nextRequestTime !== undefined && nextRequestTime <= currentTime;
156+
}
157+
158+
/**
159+
* Get repair statistics for monitoring/debugging.
160+
*/
161+
public getRepairStats(): {
162+
pendingRequests: number;
163+
pendingResponses: number;
164+
nextRequestTime?: number;
165+
nextResponseTime?: number;
166+
} {
167+
return this.repairManager.getStats();
168+
}
169+
145170
/**
146171
* Processes all queued tasks sequentially to ensure proper message ordering.
147172
*

packages/sds/src/message_channel/repair/repair.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,4 +328,72 @@ export class RepairManager {
328328
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
329329
);
330330
}
331+
332+
/**
333+
* Check if there are any pending outgoing repair requests
334+
*/
335+
public hasPendingRequests(): boolean {
336+
return this.outgoingBuffer.size > 0;
337+
}
338+
339+
/**
340+
* Get count of pending repair requests
341+
*/
342+
public getPendingRequestCount(): number {
343+
return this.outgoingBuffer.size;
344+
}
345+
346+
/**
347+
* Get count of pending repair responses
348+
*/
349+
public getPendingResponseCount(): number {
350+
return this.incomingBuffer.size;
351+
}
352+
353+
/**
354+
* Get next scheduled repair request time (earliest T_req)
355+
*/
356+
public getNextRequestTime(): number | undefined {
357+
const items = this.outgoingBuffer.getItems();
358+
return items.length > 0 ? items[0].tReq : undefined;
359+
}
360+
361+
/**
362+
* Get next scheduled repair response time (earliest T_resp)
363+
*/
364+
public getNextResponseTime(): number | undefined {
365+
const items = this.incomingBuffer.getItems();
366+
return items.length > 0 ? items[0].tResp : undefined;
367+
}
368+
369+
/**
370+
* Check if a specific message has a pending repair request
371+
*/
372+
public isPendingRequest(messageId: string): boolean {
373+
return this.outgoingBuffer.has(messageId);
374+
}
375+
376+
/**
377+
* Check if we have a pending response for a message
378+
*/
379+
public isPendingResponse(messageId: string): boolean {
380+
return this.incomingBuffer.has(messageId);
381+
}
382+
383+
/**
384+
* Get stats for monitoring/debugging
385+
*/
386+
public getStats(): {
387+
pendingRequests: number;
388+
pendingResponses: number;
389+
nextRequestTime?: number;
390+
nextResponseTime?: number;
391+
} {
392+
return {
393+
pendingRequests: this.getPendingRequestCount(),
394+
pendingResponses: this.getPendingResponseCount(),
395+
nextRequestTime: this.getNextRequestTime(),
396+
nextResponseTime: this.getNextResponseTime()
397+
};
398+
}
331399
}

0 commit comments

Comments
 (0)