Skip to content

Commit 9d82564

Browse files
committed
fix: fix build errors, simplify parallel operation
1 parent 2cc49e1 commit 9d82564

File tree

4 files changed

+18
-107
lines changed

4 files changed

+18
-107
lines changed

packages/sdk/src/reliable_channel/reliable_channel.ts

Lines changed: 13 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
4242
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
4343
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
4444
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
45-
const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes
4645

4746
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
4847
LightPushError.ENCODE_FAILED,
@@ -118,22 +117,14 @@ export type ReliableChannelOptions = MessageChannelOptions & {
118117

119118
/**
120119
* Strategy for retrieving missing messages.
121-
* - 'both': Use SDS-R peer repair and Store queries (default)
120+
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
122121
* - 'sds-r-only': Only use SDS-R peer repair
123122
* - 'store-only': Only use Store queries (legacy behavior)
124123
* - 'none': No automatic retrieval
125124
*
126125
* @default 'both'
127126
*/
128127
retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none";
129-
130-
/**
131-
* How long to wait for SDS-R repair before falling back to Store.
132-
* Only applies when retrievalStrategy is 'both'.
133-
*
134-
* @default 120,000 (2 minutes - matches SDS-R T_max)
135-
*/
136-
sdsrFallbackTimeoutMs?: number;
137128
};
138129

139130
/**
@@ -179,11 +170,6 @@ export class ReliableChannel<
179170
| "sds-r-only"
180171
| "store-only"
181172
| "none";
182-
private readonly sdsrFallbackTimeoutMs: number;
183-
private readonly missingMessageTimeouts: Map<
184-
string,
185-
ReturnType<typeof setTimeout>
186-
>;
187173
private _started: boolean;
188174

189175
private constructor(
@@ -249,9 +235,6 @@ export class ReliableChannel<
249235
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
250236

251237
this.retrievalStrategy = options?.retrievalStrategy ?? "both";
252-
this.sdsrFallbackTimeoutMs =
253-
options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS;
254-
this.missingMessageTimeouts = new Map();
255238

256239
// Only enable Store retrieval based on strategy
257240
if (this._retrieve && this.shouldUseStore()) {
@@ -458,13 +441,7 @@ export class ReliableChannel<
458441
// missing messages or the status of previous outgoing messages
459442
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
460443

461-
// Cancel Store fallback timeout if message was retrieved
462-
const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId);
463-
if (timeout) {
464-
clearTimeout(timeout);
465-
this.missingMessageTimeouts.delete(sdsMessage.messageId);
466-
}
467-
444+
// Remove from Store retriever if message was retrieved
468445
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
469446

470447
if (sdsMessage.content && sdsMessage.content.length > 0) {
@@ -544,7 +521,6 @@ export class ReliableChannel<
544521
this.stopSync();
545522
this.stopSweepIncomingBufferLoop();
546523
this.stopRepairSweepLoop();
547-
this.clearMissingMessageTimeouts();
548524
this.missingMessageRetriever?.stop();
549525
this.queryOnConnect?.stop();
550526
// TODO unsubscribe
@@ -587,13 +563,6 @@ export class ReliableChannel<
587563
if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval);
588564
}
589565

590-
private clearMissingMessageTimeouts(): void {
591-
for (const timeout of this.missingMessageTimeouts.values()) {
592-
clearTimeout(timeout);
593-
}
594-
this.missingMessageTimeouts.clear();
595-
}
596-
597566
private shouldUseStore(): boolean {
598567
return (
599568
this.retrievalStrategy === "both" ||
@@ -773,35 +742,28 @@ export class ReliableChannel<
773742
MessageChannelEvent.InMessageMissing,
774743
(event) => {
775744
for (const { messageId, retrievalHint } of event.detail) {
776-
// Coordinated retrieval strategy
745+
// Parallel retrieval strategy
777746
if (this.retrievalStrategy === "both") {
778-
// SDS-R will attempt first, schedule Store fallback
779-
// Note: missingMessageRetriever only exists if Store protocol is available
747+
// Both SDS-R and Store work in parallel
748+
// SDS-R automatically handles repair via RepairManager
749+
// Store retrieval starts immediately
780750
if (retrievalHint && this.missingMessageRetriever) {
781-
const timeout = setTimeout(() => {
782-
// Still missing after SDS-R timeout, try Store
783-
log.info(
784-
`Message ${messageId} not retrieved via SDS-R, falling back to Store`
785-
);
786-
this.missingMessageRetriever?.addMissingMessage(
787-
messageId,
788-
retrievalHint
789-
);
790-
this.missingMessageTimeouts.delete(messageId);
791-
}, this.sdsrFallbackTimeoutMs);
792-
793-
this.missingMessageTimeouts.set(messageId, timeout);
751+
this.missingMessageRetriever.addMissingMessage(
752+
messageId,
753+
retrievalHint
754+
);
794755
}
795756
} else if (this.retrievalStrategy === "store-only") {
796-
// Immediate Store retrieval
757+
// Immediate Store retrieval only
797758
if (retrievalHint && this.missingMessageRetriever) {
798759
this.missingMessageRetriever.addMissingMessage(
799760
messageId,
800761
retrievalHint
801762
);
802763
}
803764
}
804-
// For 'sds-r-only' and 'none', SDS-R handles it or nothing happens
765+
// For 'sds-r-only', only SDS-R repair manager operates
766+
// For 'none', nothing happens
805767
}
806768
}
807769
);

packages/sds/src/message_channel/events.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ export enum MessageChannelEvent {
1212
InMessageLost = "sds:in:message-irretrievably-lost",
1313
ErrorTask = "sds:error-task",
1414
// SDS-R Repair Events
15-
RepairRequestQueued = "sds:repair:request-queued",
1615
RepairRequestSent = "sds:repair:request-sent",
1716
RepairRequestReceived = "sds:repair:request-received",
18-
RepairResponseQueued = "sds:repair:response-queued",
1917
RepairResponseSent = "sds:repair:response-sent"
2018
}
2119

@@ -33,10 +31,6 @@ export type MessageChannelEvents = {
3331
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
3432
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
3533
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
36-
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
37-
messageId: MessageId;
38-
tReq: number;
39-
}>;
4034
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
4135
messageIds: MessageId[];
4236
carrierMessageId: MessageId;
@@ -45,10 +39,6 @@ export type MessageChannelEvents = {
4539
messageIds: MessageId[];
4640
fromSenderId?: ParticipantId;
4741
}>;
48-
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
49-
messageId: MessageId;
50-
tResp: number;
51-
}>;
5242
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
5343
messageId: MessageId;
5444
}>;

packages/sds/src/message_channel/message_channel.ts

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
128128

129129
// Only construct RepairManager if repair is enabled (default: true)
130130
if (options.enableRepair ?? true) {
131-
this.repairManager = new RepairManager(
132-
senderId,
133-
options.repairConfig,
134-
(event: string, detail: unknown) => {
135-
this.safeSendEvent(event as MessageChannelEvent, { detail });
136-
}
137-
);
131+
this.repairManager = new RepairManager(senderId, options.repairConfig);
138132
}
139133
}
140134

@@ -147,26 +141,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
147141
* Useful for adaptive sync intervals - increase frequency when repairs pending.
148142
*/
149143
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
150-
if (!this.repairManager.isEnabled) {
144+
if (!this.repairManager) {
151145
return false;
152146
}
153147

154148
const nextRequestTime = this.repairManager.getNextRequestTime();
155149
return nextRequestTime !== undefined && nextRequestTime <= currentTime;
156150
}
157151

158-
/**
159-
* Get repair statistics for monitoring/debugging.
160-
*/
161-
public getRepairStats(): {
162-
pendingRequests: number;
163-
pendingResponses: number;
164-
nextRequestTime?: number;
165-
nextResponseTime?: number;
166-
} {
167-
return this.repairManager.getStats();
168-
}
169-
170152
/**
171153
* Processes all queued tasks sequentially to ensure proper message ordering.
172154
*

packages/sds/src/message_channel/repair/repair.ts

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager");
2020
*/
2121
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;
2222

23-
/**
24-
* Event emitter callback for repair events
25-
*/
26-
export type RepairEventEmitter = (event: string, detail: unknown) => void;
27-
2823
/**
2924
* Configuration for SDS-R repair protocol
3025
*/
@@ -58,16 +53,10 @@ export class RepairManager {
5853
private readonly config: Required<RepairConfig>;
5954
private readonly outgoingBuffer: OutgoingRepairBuffer;
6055
private readonly incomingBuffer: IncomingRepairBuffer;
61-
private readonly eventEmitter?: RepairEventEmitter;
6256

63-
public constructor(
64-
participantId: ParticipantId,
65-
config: RepairConfig = {},
66-
eventEmitter?: RepairEventEmitter
67-
) {
57+
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
6858
this.participantId = participantId;
6959
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
70-
this.eventEmitter = eventEmitter;
7160

7261
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
7362
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
@@ -142,19 +131,13 @@ export class RepairManager {
142131
// Calculate when to request this repair
143132
const tReq = this.calculateTReq(entry.messageId, currentTime);
144133

145-
// Add to outgoing buffer - only log and emit event if actually added
134+
// Add to outgoing buffer - only log if actually added
146135
const wasAdded = this.outgoingBuffer.add(entry, tReq);
147136

148137
if (wasAdded) {
149138
log.info(
150139
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
151140
);
152-
153-
// Emit event
154-
this.eventEmitter?.("RepairRequestQueued", {
155-
messageId: entry.messageId,
156-
tReq
157-
});
158141
}
159142
}
160143
}
@@ -238,19 +221,13 @@ export class RepairManager {
238221
currentTime
239222
);
240223

241-
// Add to incoming buffer - only log and emit event if actually added
224+
// Add to incoming buffer - only log if actually added
242225
const wasAdded = this.incomingBuffer.add(request, tResp);
243226

244227
if (wasAdded) {
245228
log.info(
246229
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
247230
);
248-
249-
// Emit event
250-
this.eventEmitter?.("RepairResponseQueued", {
251-
messageId: request.messageId,
252-
tResp
253-
});
254231
}
255232
}
256233
}

0 commit comments

Comments
 (0)