Skip to content
103 changes: 95 additions & 8 deletions packages/sdk/src/reliable_channel/reliable_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
MessageChannel,
MessageChannelEvent,
type MessageChannelOptions,
type ParticipantId,
Message as SdsMessage,
type SenderId,
SyncMessage
} from "@waku/sds";
import { Logger } from "@waku/utils";
Expand All @@ -36,9 +36,11 @@
const log = new Logger("sdk:reliable-channel");

const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting the review so it may not hold, but I wonder if instead we should use the "multiplier" concept for this, instead of different values.

const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;

const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
Expand Down Expand Up @@ -78,6 +80,7 @@

/**
* How often store queries are done to retrieve missing messages.
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
*
* @default 10,000 (10 seconds)
*/
Expand Down Expand Up @@ -111,6 +114,17 @@
* @default 1000 (1 second)
*/
processTaskMinElapseMs?: number;

/**
* Strategy for retrieving missing messages.
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
* - 'sds-r-only': Only use SDS-R peer repair
* - 'store-only': Only use Store queries (legacy behavior)
* - 'none': No automatic retrieval
*
* @default 'both'
*/
retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none";
};

/**
Expand Down Expand Up @@ -145,11 +159,17 @@
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
private readonly queryOnConnect?: QueryOnConnect<T>;
private readonly processTaskMinElapseMs: number;
private readonly retrievalStrategy:
| "both"
| "sds-r-only"
| "store-only"
| "none";
private _started: boolean;

private constructor(
Expand Down Expand Up @@ -180,7 +200,7 @@

if (node.store) {
this._retrieve = node.store.queryGenerator.bind(node.store);
const peerManagerEvents = (node as any)?.peerManager?.events;

Check warning on line 203 in packages/sdk/src/reliable_channel/reliable_channel.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 203 in packages/sdk/src/reliable_channel/reliable_channel.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
if (
peerManagerEvents !== undefined &&
(options?.queryOnConnect ?? true)
Expand Down Expand Up @@ -214,7 +234,10 @@
this.processTaskMinElapseMs =
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;

if (this._retrieve) {
this.retrievalStrategy = options?.retrievalStrategy ?? "both";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are applying default twice, which is not idea. Make the retrievalStrategy a mandatory argument here, so it can be defaulted only once in create function.


// Only enable Store retrieval based on strategy
if (this._retrieve && this.shouldUseStore()) {
this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder,
options?.retrieveFrequencyMs,
Expand Down Expand Up @@ -264,12 +287,20 @@
public static async create<T extends IDecodedMessage>(
node: IWaku,
channelId: ChannelId,
senderId: SenderId,
senderId: ParticipantId,
encoder: IEncoder,
decoder: IDecoder<T>,
options?: ReliableChannelOptions
): Promise<ReliableChannel<T>> {
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
// Enable SDS-R repair only if retrieval strategy uses it
const retrievalStrategy = options?.retrievalStrategy ?? "both";
const enableRepair =
retrievalStrategy === "both" || retrievalStrategy === "sds-r-only";

const sdsMessageChannel = new MessageChannel(channelId, senderId, {
...options,
enableRepair
});
const messageChannel = new ReliableChannel(
node,
sdsMessageChannel,
Expand Down Expand Up @@ -418,6 +449,7 @@
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);

// Remove from Store retriever if message was retrieved
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);

if (sdsMessage.content && sdsMessage.content.length > 0) {
Expand Down Expand Up @@ -478,6 +510,12 @@
this.setupEventListeners();
this.restartSync();
this.startSweepIncomingBufferLoop();

// Only start repair sweep if SDS-R is enabled
if (this.shouldUseSdsR()) {
this.startRepairSweepLoop();
}
Comment on lines +514 to +517
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's better to put the check inside startRepairSweepLoop to be simpler and safer.


if (this._retrieve) {
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
Expand All @@ -490,6 +528,7 @@
this._started = false;
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.stopRepairSweepLoop();
this.missingMessageRetriever?.stop();
this.queryOnConnect?.stop();
// TODO unsubscribe
Expand All @@ -512,18 +551,60 @@
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
}

private startRepairSweepLoop(): void {
this.stopRepairSweepLoop();
this.sweepRepairInterval = setInterval(() => {
void this.messageChannel
.sweepRepairIncomingBuffer(async (message) => {
// Rebroadcast the repair message
const wakuMessage = { payload: message.encode() };
const result = await this._send(this.encoder, wakuMessage);
return result.failures.length === 0;
})
.catch((err) => {
log.error("error encountered when sweeping repair buffer", err);
});
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
}

private stopRepairSweepLoop(): void {
if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval);
if (this.sweepRepairInterval) {
clearInterval(this.sweepRepairInterval);
this.sweepInBufInterval = undefined;
}

It's apparently better.

}

private shouldUseStore(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "store-only"
);
}

private shouldUseSdsR(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "sds-r-only"
);
}

private restartSync(multiplier: number = 1): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
}
if (this.syncMinIntervalMs) {
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
// Adaptive sync: use shorter interval when repairs are pending
const hasPendingRepairs =
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
const baseInterval = hasPendingRepairs
? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS
: this.syncMinIntervalMs;
Comment on lines +596 to +598
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about doing a multiplier of 0.3 or 0.5 instead?


const timeoutMs = this.random() * baseInterval * multiplier;

this.syncTimeout = setTimeout(() => {
void this.sendSyncMessage();
// Always restart a sync, no matter whether the message was sent.
// Set a multiplier so we wait a bit longer to not hog the conversation
void this.restartSync(2);
// Use smaller multiplier when repairs pending to send more frequently
const nextMultiplier = hasPendingRepairs ? 1.2 : 2;
void this.restartSync(nextMultiplier);
}, timeoutMs);
}
}
Expand Down Expand Up @@ -669,7 +750,13 @@
MessageChannelEvent.InMessageMissing,
(event) => {
for (const { messageId, retrievalHint } of event.detail) {
if (retrievalHint && this.missingMessageRetriever) {
// Store retrieval (for 'both' and 'store-only' strategies)
// SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only'
if (
this.shouldUseStore() &&
retrievalHint &&
this.missingMessageRetriever
) {
Comment on lines +753 to +759
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary, as missingMessageRetriever should not be instantiated if this.shouldUseStore is false.

this.missingMessageRetriever.addMissingMessage(
messageId,
retrievalHint
Expand Down
10 changes: 0 additions & 10 deletions packages/sds/src/message_channel/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ export enum MessageChannelEvent {
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}

Expand All @@ -33,10 +31,6 @@ export type MessageChannelEvents = {
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
Expand All @@ -45,10 +39,6 @@ export type MessageChannelEvents = {
messageIds: MessageId[];
fromSenderId?: ParticipantId;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;
Expand Down
16 changes: 9 additions & 7 deletions packages/sds/src/message_channel/message_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {

// Only construct RepairManager if repair is enabled (default: true)
if (options.enableRepair ?? true) {
this.repairManager = new RepairManager(
senderId,
options.repairConfig,
(event: string, detail: unknown) => {
this.safeSendEvent(event as MessageChannelEvent, { detail });
}
);
this.repairManager = new RepairManager(senderId, options.repairConfig);
}
}

public static getMessageId(payload: Uint8Array): MessageId {
return bytesToHex(sha256(payload));
}

/**
* Check if there are pending repair requests that need to be sent.
* Useful for adaptive sync intervals - increase frequency when repairs pending.
*/
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
return this.repairManager?.hasRequestsReady(currentTime) ?? false;
}

/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*
Expand Down
37 changes: 11 additions & 26 deletions packages/sds/src/message_channel/repair/repair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager");
*/
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;

/**
* Event emitter callback for repair events
*/
export type RepairEventEmitter = (event: string, detail: unknown) => void;

/**
* Configuration for SDS-R repair protocol
*/
Expand Down Expand Up @@ -58,16 +53,10 @@ export class RepairManager {
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
private readonly eventEmitter?: RepairEventEmitter;

public constructor(
participantId: ParticipantId,
config: RepairConfig = {},
eventEmitter?: RepairEventEmitter
) {
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.eventEmitter = eventEmitter;

this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
Expand Down Expand Up @@ -142,19 +131,13 @@ export class RepairManager {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);

// Add to outgoing buffer - only log and emit event if actually added
// Add to outgoing buffer - only log if actually added
const wasAdded = this.outgoingBuffer.add(entry, tReq);

if (wasAdded) {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);

// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
}
Expand Down Expand Up @@ -238,19 +221,13 @@ export class RepairManager {
currentTime
);

// Add to incoming buffer - only log and emit event if actually added
// Add to incoming buffer - only log if actually added
const wasAdded = this.incomingBuffer.add(request, tResp);

if (wasAdded) {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);

// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}
}
Expand Down Expand Up @@ -328,4 +305,12 @@ export class RepairManager {
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}

/**
* Check if there are repair requests ready to be sent
*/
public hasRequestsReady(currentTime = Date.now()): boolean {
const items = this.outgoingBuffer.getItems();
return items.length > 0 && items[0].tReq <= currentTime;
}
}
Loading