Skip to content

Commit e5f51d7

Browse files
authored
feat: Reliable Channel: Status Sync, overflow protection, stop TODOs (#2729)
* feat(sds): messages with lost deps are delivered This is to re-enable participation in the SDS protocol. Meaning the received message with missing dependencies becomes part of the causal history, re-enabling acknowledgements. * fix(sds): avoid overflow in message history storage * feat(reliable-channel): Emit a "Synced" Status with message counts Return a "synced" or "syncing" status on `ReliableChannel.status` that let the developer know whether messages are missing, and if so, how many. * fix: clean up subscriptions, intervals and timeouts when stopping # Conflicts: # packages/sdk/src/reliable_channel/reliable_channel.ts * chore: extract random timeout * fix rebase * revert listener changes * typo * Ensuring no inconsistency on missing message * test: streamline, stop channels * clear sync status sets when stopping channel * prevent sync status event spam * test: improve naming * try/catch for callback * encapsulate/simplify reliable channel API * sanity checks * test: ensure sync status cleanup
1 parent 84a6ea6 commit e5f51d7

19 files changed

+1210
-274
lines changed

packages/core/src/lib/stream_manager/stream_manager.spec.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ describe("StreamManager", () => {
2727
} as any as Libp2pComponents);
2828
});
2929

30+
afterEach(() => {
31+
sinon.restore();
32+
});
33+
3034
it("should return usable stream attached to connection", async () => {
3135
for (const writeStatus of ["ready", "writing"]) {
3236
const con1 = createMockConnection();

packages/sdk/src/light_push/light_push.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { Libp2p, LightPushError, LightPushStatusCode } from "@waku/interfaces";
99
import { createRoutingInfo } from "@waku/utils";
1010
import { utf8ToBytes } from "@waku/utils/bytes";
1111
import { expect } from "chai";
12+
import { afterEach } from "mocha";
1213
import sinon, { SinonSpy } from "sinon";
1314

1415
import { PeerManager } from "../peer_manager/index.js";
@@ -38,6 +39,10 @@ describe("LightPush SDK", () => {
3839
lightPush = mockLightPush({ libp2p });
3940
});
4041

42+
afterEach(() => {
43+
sinon.restore();
44+
});
45+
4146
it("should fail to send if no connected peers found", async () => {
4247
const result = await lightPush.send(encoder, {
4348
payload: utf8ToBytes("test")

packages/sdk/src/light_push/retry_manager.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ describe("RetryManager", () => {
4747
sinon.restore();
4848
});
4949

50-
it("should start and stop interval correctly", () => {
50+
// TODO: Skipped because the global state is not being restored and it breaks
51+
// tests of functionalities that rely on intervals
52+
it.skip("should start and stop interval correctly", () => {
5153
const setIntervalSpy = sinon.spy(global, "setInterval");
5254
const clearIntervalSpy = sinon.spy(global, "clearInterval");
5355

packages/sdk/src/query_on_connect/query_on_connect.spec.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { delay } from "@waku/utils";
1111
import { utf8ToBytes } from "@waku/utils/bytes";
1212
import { expect } from "chai";
13+
import { afterEach } from "mocha";
1314
import sinon from "sinon";
1415

1516
import {
@@ -91,6 +92,10 @@ describe("QueryOnConnect", () => {
9192
};
9293
});
9394

95+
afterEach(() => {
96+
sinon.restore();
97+
});
98+
9499
describe("constructor", () => {
95100
it("should create QueryOnConnect instance with all required parameters", () => {
96101
queryOnConnect = new QueryOnConnect(
@@ -337,6 +342,7 @@ describe("QueryOnConnect", () => {
337342
});
338343

339344
afterEach(() => {
345+
sinon.restore();
340346
mockClock.restore();
341347
});
342348

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
22
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
3+
export {
4+
StatusEvent,
5+
StatusEvents,
6+
StatusDetail,
7+
ISyncStatusEvents
8+
} from "./sync_status.js";
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { Logger } from "@waku/utils";
2+
3+
const log = new Logger("sdk:random-timeout");
4+
5+
/**
6+
* Enables waiting a random time before doing an action (using `setTimeout`),
7+
* with possibility to apply a multiplier to manipulate said time.
8+
*/
9+
export class RandomTimeout {
10+
private timeout: ReturnType<typeof setTimeout> | undefined;
11+
12+
public constructor(
13+
/**
14+
* The maximum interval one would wait before the call is made, in milliseconds.
15+
*/
16+
private maxIntervalMs: number,
17+
/**
18+
* When not zero: Anytime a call is made, then a new call will be rescheduled
19+
* using this multiplier
20+
*/
21+
private multiplierOnCall: number,
22+
/**
23+
* The function to call when the timer is reached
24+
*/
25+
private callback: () => void | Promise<void>
26+
) {
27+
if (!Number.isFinite(maxIntervalMs) || maxIntervalMs < 0) {
28+
throw new Error(
29+
`maxIntervalMs must be a non-negative finite number, got: ${maxIntervalMs}`
30+
);
31+
}
32+
if (!Number.isFinite(multiplierOnCall)) {
33+
throw new Error(
34+
`multiplierOnCall must be a finite number, got: ${multiplierOnCall}`
35+
);
36+
}
37+
}
38+
39+
/**
40+
* Use to start the timer. If a timer was already set, it deletes it and
41+
* schedule a new one.
42+
* @param multiplier applied to [[maxIntervalMs]]
43+
*/
44+
public restart(multiplier: number = 1): void {
45+
this.stop();
46+
47+
if (this.maxIntervalMs) {
48+
const timeoutMs = Math.random() * this.maxIntervalMs * multiplier;
49+
50+
this.timeout = setTimeout(() => {
51+
try {
52+
void this.callback();
53+
} catch (error) {
54+
log.error("Error in RandomTimeout callback:", error);
55+
}
56+
void this.restart(this.multiplierOnCall);
57+
}, timeoutMs);
58+
}
59+
}
60+
61+
public stop(): void {
62+
if (this.timeout) {
63+
clearTimeout(this.timeout);
64+
this.timeout = undefined;
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)