Skip to content

Commit 682aeee

Browse files
committed
feat(reliable-channel): Emit a "Synced" Status with message counts
Return a "synced" or "syncing" status on `ReliableChannel.status` that let the developer know whether messages are missing, and if so, how many.
1 parent 87dc5fa commit 682aeee

File tree

6 files changed

+516
-1
lines changed

6 files changed

+516
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
22
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
3+
export {
4+
StatusEvent,
5+
StatusEvents,
6+
StatusDetail,
7+
SyncStatus
8+
} from "./sync_status.js";

packages/sdk/src/reliable_channel/reliable_channel.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
3434
import { MissingMessageRetriever } from "./missing_message_retriever.js";
3535
import { RetryManager } from "./retry_manager.js";
36+
import { SyncStatus } from "./sync_status.js";
3637

3738
const log = new Logger("sdk:reliable-channel");
3839

@@ -234,8 +235,20 @@ export class ReliableChannel<
234235
}
235236

236237
this._started = false;
238+
239+
this.syncStatus = new SyncStatus();
237240
}
238241

242+
/**
243+
* Emit events when the channel is aware of missing message.
244+
* Note that "syncd" may mean some messages are irretrievably lost.
245+
* Check the emitted data for details.
246+
*
247+
* @emits [[StatusEvents]]
248+
*
249+
*/
250+
public syncStatus: SyncStatus;
251+
239252
public get isStarted(): boolean {
240253
return this._started;
241254
}
@@ -709,6 +722,7 @@ export class ReliableChannel<
709722
this.addTrackedEventListener(
710723
MessageChannelEvent.InMessageReceived,
711724
(event) => {
725+
this.syncStatus.onMessagesReceived(event.detail.messageId);
712726
// restart the timeout when a content message has been received
713727
if (isContentMessage(event.detail)) {
714728
// send a sync message faster to ack someone's else
@@ -730,6 +744,10 @@ export class ReliableChannel<
730744
this.addTrackedEventListener(
731745
MessageChannelEvent.InMessageMissing,
732746
(event) => {
747+
this.syncStatus.onMessagesMissing(
748+
...event.detail.map((m) => m.messageId)
749+
);
750+
733751
for (const { messageId, retrievalHint } of event.detail) {
734752
if (retrievalHint && this.missingMessageRetriever) {
735753
this.missingMessageRetriever.addMissingMessage(
@@ -741,6 +759,13 @@ export class ReliableChannel<
741759
}
742760
);
743761

762+
this.messageChannel.addEventListener(
763+
MessageChannelEvent.InMessageLost,
764+
(event) => {
765+
this.syncStatus.onMessagesLost(...event.detail.map((m) => m.messageId));
766+
}
767+
);
768+
744769
if (this.queryOnConnect) {
745770
const queryListener = (event: any): void => {
746771
void this.processIncomingMessages(event.detail);
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import { TypedEventEmitter } from "@libp2p/interface";
2+
import { createDecoder, createEncoder } from "@waku/core";
3+
import {
4+
AutoSharding,
5+
IDecodedMessage,
6+
IDecoder,
7+
IEncoder
8+
} from "@waku/interfaces";
9+
import {
10+
createRoutingInfo,
11+
delay,
12+
MockWakuEvents,
13+
MockWakuNode
14+
} from "@waku/utils";
15+
import { utf8ToBytes } from "@waku/utils/bytes";
16+
import { expect } from "chai";
17+
import { beforeEach, describe } from "mocha";
18+
19+
import { ReliableChannel, StatusDetail } from "./index.js";
20+
21+
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
22+
const TEST_NETWORK_CONFIG: AutoSharding = {
23+
clusterId: 0,
24+
numShardsInCluster: 1
25+
};
26+
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
27+
contentTopic: TEST_CONTENT_TOPIC
28+
});
29+
30+
describe("Status", () => {
31+
let encoder: IEncoder;
32+
let decoder: IDecoder<IDecodedMessage>;
33+
34+
beforeEach(async () => {
35+
encoder = createEncoder({
36+
contentTopic: TEST_CONTENT_TOPIC,
37+
routingInfo: TEST_ROUTING_INFO
38+
});
39+
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
40+
});
41+
42+
it("Synced status is emitted when a message is received", async () => {
43+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
44+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
45+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
46+
47+
const reliableChannelAlice = await ReliableChannel.create(
48+
mockWakuNodeAlice,
49+
"MyChannel",
50+
"alice",
51+
encoder,
52+
decoder
53+
);
54+
const reliableChannelBob = await ReliableChannel.create(
55+
mockWakuNodeBob,
56+
"MyChannel",
57+
"bob",
58+
encoder,
59+
decoder
60+
);
61+
62+
let statusDetail: StatusDetail;
63+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
64+
statusDetail = event.detail;
65+
});
66+
67+
const message = utf8ToBytes("message in channel");
68+
69+
reliableChannelAlice.send(message);
70+
while (!statusDetail!) {
71+
await delay(50);
72+
}
73+
74+
expect(statusDetail.received).to.eq(1);
75+
});
76+
77+
it("Synced status is emitted when a missing message is received", async () => {
78+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
79+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
80+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
81+
82+
const reliableChannelAlice = await ReliableChannel.create(
83+
mockWakuNodeAlice,
84+
"MyChannel",
85+
"alice",
86+
encoder,
87+
decoder,
88+
{
89+
retryIntervalMs: 300 // shorter retry so that it resends message in test
90+
}
91+
);
92+
93+
// Send a message before Bob goes online so it's marked as missing
94+
let messageSent = false;
95+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
96+
messageSent = true;
97+
});
98+
reliableChannelAlice.send(utf8ToBytes("missing message"));
99+
while (!messageSent) {
100+
await delay(50);
101+
}
102+
103+
const reliableChannelBob = await ReliableChannel.create(
104+
mockWakuNodeBob,
105+
"MyChannel",
106+
"bob",
107+
encoder,
108+
decoder
109+
);
110+
111+
let syncingStatusDetail: StatusDetail;
112+
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
113+
syncingStatusDetail = event.detail;
114+
});
115+
116+
let syncedStatusDetail: StatusDetail;
117+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
118+
syncedStatusDetail = event.detail;
119+
});
120+
121+
messageSent = false;
122+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
123+
messageSent = true;
124+
});
125+
reliableChannelAlice.send(
126+
utf8ToBytes("second message with missing message as dep")
127+
);
128+
while (!messageSent) {
129+
await delay(50);
130+
}
131+
132+
while (!syncingStatusDetail!) {
133+
await delay(50);
134+
}
135+
136+
expect(syncingStatusDetail.missing).to.eq(1);
137+
expect(syncingStatusDetail.received).to.eq(1);
138+
139+
while (!syncedStatusDetail!) {
140+
await delay(50);
141+
}
142+
143+
expect(syncedStatusDetail.missing).to.eq(0);
144+
expect(syncedStatusDetail.received).to.eq(2);
145+
});
146+
147+
it("Synced status is emitted when a missing message is mark as lost", async () => {
148+
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
149+
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
150+
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
151+
152+
const reliableChannelAlice = await ReliableChannel.create(
153+
mockWakuNodeAlice,
154+
"MyChannel",
155+
"alice",
156+
encoder,
157+
decoder,
158+
{
159+
syncMinIntervalMs: 0,
160+
retryIntervalMs: 0 // Do not retry so we can lose the message
161+
}
162+
);
163+
164+
// Send a message before Bob goes online so it's marked as missing
165+
let messageSent = false;
166+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
167+
messageSent = true;
168+
});
169+
reliableChannelAlice.send(utf8ToBytes("missing message"));
170+
while (!messageSent) {
171+
await delay(50);
172+
}
173+
174+
const reliableChannelBob = await ReliableChannel.create(
175+
mockWakuNodeBob,
176+
"MyChannel",
177+
"bob",
178+
encoder,
179+
decoder,
180+
{
181+
retrieveFrequencyMs: 0,
182+
syncMinIntervalMs: 0,
183+
sweepInBufIntervalMs: 50, // frequently sweep incoming buffer to mark msg as lost
184+
timeoutForLostMessagesMs: 200 // timeout within the test
185+
}
186+
);
187+
188+
let syncingStatusDetail: StatusDetail;
189+
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
190+
syncingStatusDetail = event.detail;
191+
});
192+
193+
messageSent = false;
194+
reliableChannelAlice.addEventListener("message-sent", (_event) => {
195+
messageSent = true;
196+
});
197+
reliableChannelAlice.send(
198+
utf8ToBytes("second message with missing message as dep")
199+
);
200+
while (!messageSent) {
201+
await delay(50);
202+
}
203+
204+
while (!syncingStatusDetail!) {
205+
await delay(50);
206+
}
207+
208+
expect(syncingStatusDetail.missing).to.eq(1, "at first, one missing");
209+
expect(syncingStatusDetail.received).to.eq(1, "at first, one received");
210+
expect(syncingStatusDetail.lost).to.eq(0, "at first, no loss");
211+
212+
let syncedStatusDetail: StatusDetail;
213+
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
214+
syncedStatusDetail = event.detail;
215+
});
216+
while (!syncedStatusDetail!) {
217+
await delay(50);
218+
}
219+
220+
expect(syncedStatusDetail.missing).to.eq(0, "no more missing message");
221+
expect(syncedStatusDetail.received).to.eq(1, "still one received message");
222+
expect(syncedStatusDetail.lost).to.eq(1, "missing message is marked lost");
223+
});
224+
});

0 commit comments

Comments
 (0)