Skip to content

Commit 1dc59e0

Browse files
committed
feat(reliable-channel): Emit a "Synced" Status with message counts
Return a "synced" or "syncing" status on `ReliableChannel.status` that let the developer know whether messages are missing, and if so, how many.
1 parent 3f2a6d6 commit 1dc59e0

File tree

6 files changed

+516
-1
lines changed

6 files changed

+516
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
22
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
3+
export {
4+
StatusEvent,
5+
StatusEvents,
6+
StatusDetail,
7+
SyncStatus
8+
} from "./sync_status.js";

packages/sdk/src/reliable_channel/reliable_channel.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
3333
import { MissingMessageRetriever } from "./missing_message_retriever.js";
3434
import { RetryManager } from "./retry_manager.js";
35+
import { SyncStatus } from "./sync_status.js";
3536

3637
const log = new Logger("sdk:reliable-channel");
3738

@@ -226,8 +227,20 @@ export class ReliableChannel<
226227
}
227228

228229
this._started = false;
230+
231+
this.syncStatus = new SyncStatus();
229232
}
230233

234+
/**
235+
* Emit events when the channel is aware of missing message.
236+
* Note that "syncd" may mean some messages are irretrievably lost.
237+
* Check the emitted data for details.
238+
*
239+
* @emits [[StatusEvents]]
240+
*
241+
*/
242+
public syncStatus: SyncStatus;
243+
231244
public get isStarted(): boolean {
232245
return this._started;
233246
}
@@ -647,6 +660,7 @@ export class ReliableChannel<
647660
this.messageChannel.addEventListener(
648661
MessageChannelEvent.InMessageReceived,
649662
(event) => {
663+
this.syncStatus.onMessagesReceived(event.detail.messageId);
650664
// restart the timeout when a content message has been received
651665
if (isContentMessage(event.detail)) {
652666
// send a sync message faster to ack someone's else
@@ -668,6 +682,10 @@ export class ReliableChannel<
668682
this.messageChannel.addEventListener(
669683
MessageChannelEvent.InMessageMissing,
670684
(event) => {
685+
this.syncStatus.onMessagesMissing(
686+
...event.detail.map((m) => m.messageId)
687+
);
688+
671689
for (const { messageId, retrievalHint } of event.detail) {
672690
if (retrievalHint && this.missingMessageRetriever) {
673691
this.missingMessageRetriever.addMissingMessage(
@@ -679,6 +697,13 @@ export class ReliableChannel<
679697
}
680698
);
681699

700+
this.messageChannel.addEventListener(
701+
MessageChannelEvent.InMessageLost,
702+
(event) => {
703+
this.syncStatus.onMessagesLost(...event.detail.map((m) => m.messageId));
704+
}
705+
);
706+
682707
if (this.queryOnConnect) {
683708
this.queryOnConnect.addEventListener(
684709
QueryOnConnectEvent.MessagesRetrieved,
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import { TypedEventEmitter } from "@libp2p/interface";
2+
import { createDecoder, createEncoder } from "@waku/core";
3+
import {
4+
AutoSharding,
5+
IDecodedMessage,
6+
IDecoder,
7+
IEncoder
8+
} from "@waku/interfaces";
9+
import {
10+
createRoutingInfo,
11+
delay,
12+
MockWakuEvents,
13+
MockWakuNode
14+
} from "@waku/utils";
15+
import { utf8ToBytes } from "@waku/utils/bytes";
16+
import { expect } from "chai";
17+
import { beforeEach, describe } from "mocha";
18+
19+
import { ReliableChannel, StatusDetail } from "./index.js";
20+
21+
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
22+
const TEST_NETWORK_CONFIG: AutoSharding = {
23+
clusterId: 0,
24+
numShardsInCluster: 1
25+
};
26+
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
27+
contentTopic: TEST_CONTENT_TOPIC
28+
});
29+
30+
describe("Status", () => {
31+
let encoder: IEncoder;
32+
let decoder: IDecoder<IDecodedMessage>;
33+
34+
beforeEach(async () => {
35+
encoder = createEncoder({
36+
contentTopic: TEST_CONTENT_TOPIC,
37+
routingInfo: TEST_ROUTING_INFO
38+
});
39+
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
40+
});
41+
42+
it("Synced status is emitted when a message is received", async () => {
43+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
44+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
45+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
46+
47+
const reliableChannelAlice = await ReliableChannel.create(
48+
mockWakuNodeAlice,
49+
"MyChannel",
50+
"alice",
51+
encoder,
52+
decoder
53+
);
54+
const reliableChannelBob = await ReliableChannel.create(
55+
mockWakuNodeBob,
56+
"MyChannel",
57+
"bob",
58+
encoder,
59+
decoder
60+
);
61+
62+
let statusDetail: StatusDetail;
63+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
64+
statusDetail = event.detail;
65+
});
66+
67+
const message = utf8ToBytes("message in channel");
68+
69+
reliableChannelAlice.send(message);
70+
while (!statusDetail!) {
71+
await delay(50);
72+
}
73+
74+
expect(statusDetail.received).to.eq(1);
75+
});
76+
77+
it("Synced status is emitted when a missing message is received", async () => {
78+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
79+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
80+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
81+
82+
const reliableChannelAlice = await ReliableChannel.create(
83+
mockWakuNodeAlice,
84+
"MyChannel",
85+
"alice",
86+
encoder,
87+
decoder,
88+
{
89+
retryIntervalMs: 300 // shorter retry so that it resends message in test
90+
}
91+
);
92+
93+
// Send a message before Bob goes online so it's marked as missing
94+
let messageSent = false;
95+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
96+
messageSent = true;
97+
});
98+
reliableChannelAlice.send(utf8ToBytes("missing message"));
99+
while (!messageSent) {
100+
await delay(50);
101+
}
102+
103+
const reliableChannelBob = await ReliableChannel.create(
104+
mockWakuNodeBob,
105+
"MyChannel",
106+
"bob",
107+
encoder,
108+
decoder
109+
);
110+
111+
let syncingStatusDetail: StatusDetail;
112+
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
113+
syncingStatusDetail = event.detail;
114+
});
115+
116+
let syncedStatusDetail: StatusDetail;
117+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
118+
syncedStatusDetail = event.detail;
119+
});
120+
121+
messageSent = false;
122+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
123+
messageSent = true;
124+
});
125+
reliableChannelAlice.send(
126+
utf8ToBytes("second message with missing message as dep")
127+
);
128+
while (!messageSent) {
129+
await delay(50);
130+
}
131+
132+
while (!syncingStatusDetail!) {
133+
await delay(50);
134+
}
135+
136+
expect(syncingStatusDetail.missing).to.eq(1);
137+
expect(syncingStatusDetail.received).to.eq(1);
138+
139+
while (!syncedStatusDetail!) {
140+
await delay(50);
141+
}
142+
143+
expect(syncedStatusDetail.missing).to.eq(0);
144+
expect(syncedStatusDetail.received).to.eq(2);
145+
});
146+
147+
it("Synced status is emitted when a missing message is mark as lost", async () => {
148+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
149+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
150+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
151+
152+
const reliableChannelAlice = await ReliableChannel.create(
153+
mockWakuNodeAlice,
154+
"MyChannel",
155+
"alice",
156+
encoder,
157+
decoder,
158+
{
159+
syncMinIntervalMs: 0,
160+
retryIntervalMs: 0 // Do not retry so we can lose the message
161+
}
162+
);
163+
164+
// Send a message before Bob goes online so it's marked as missing
165+
let messageSent = false;
166+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
167+
messageSent = true;
168+
});
169+
reliableChannelAlice.send(utf8ToBytes("missing message"));
170+
while (!messageSent) {
171+
await delay(50);
172+
}
173+
174+
const reliableChannelBob = await ReliableChannel.create(
175+
mockWakuNodeBob,
176+
"MyChannel",
177+
"bob",
178+
encoder,
179+
decoder,
180+
{
181+
retrieveFrequencyMs: 0,
182+
syncMinIntervalMs: 0,
183+
sweepInBufIntervalMs: 50, // frequently sweep incoming buffer to mark msg as lost
184+
timeoutForLostMessagesMs: 200 // timeout within the test
185+
}
186+
);
187+
188+
let syncingStatusDetail: StatusDetail;
189+
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
190+
syncingStatusDetail = event.detail;
191+
});
192+
193+
messageSent = false;
194+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
195+
messageSent = true;
196+
});
197+
reliableChannelAlice.send(
198+
utf8ToBytes("second message with missing message as dep")
199+
);
200+
while (!messageSent) {
201+
await delay(50);
202+
}
203+
204+
while (!syncingStatusDetail!) {
205+
await delay(50);
206+
}
207+
208+
expect(syncingStatusDetail.missing).to.eq(1, "at first, one missing");
209+
expect(syncingStatusDetail.received).to.eq(1, "at first, one received");
210+
expect(syncingStatusDetail.lost).to.eq(0, "at first, no loss");
211+
212+
let syncedStatusDetail: StatusDetail;
213+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
214+
syncedStatusDetail = event.detail;
215+
});
216+
while (!syncedStatusDetail!) {
217+
await delay(50);
218+
}
219+
220+
expect(syncedStatusDetail.missing).to.eq(0, "no more missing message");
221+
expect(syncedStatusDetail.received).to.eq(1, "still one received message");
222+
expect(syncedStatusDetail.lost).to.eq(1, "missing message is marked lost");
223+
});
224+
});

0 commit comments

Comments
 (0)