@@ -6,18 +6,37 @@ import {
66 IDecoder ,
77 IEncoder
88} from "@waku/interfaces" ;
9- import {
10- createRoutingInfo ,
11- delay ,
12- MockWakuEvents ,
13- MockWakuNode
14- } from "@waku/utils" ;
9+ import { createRoutingInfo , MockWakuEvents , MockWakuNode } from "@waku/utils" ;
1510import { bytesToUtf8 , utf8ToBytes } from "@waku/utils/bytes" ;
1611import { expect } from "chai" ;
1712import { beforeEach , describe } from "mocha" ;
1813
1914import { ReliableChannel } from "./index.js" ;
2015
16+ function waitForEvent < T > (
17+ emitter : TypedEventEmitter < any > ,
18+ eventName : string ,
19+ predicate ?: ( detail : T ) => boolean ,
20+ timeoutMs : number = 5000
21+ ) : Promise < T > {
22+ return new Promise ( ( resolve , reject ) => {
23+ const timeout = setTimeout ( ( ) => {
24+ emitter . removeEventListener ( eventName , handler ) ;
25+ reject ( new Error ( `Timeout waiting for event: ${ eventName } ` ) ) ;
26+ } , timeoutMs ) ;
27+
28+ const handler = ( event : CustomEvent < T > ) : void => {
29+ if ( ! predicate || predicate ( event . detail ) ) {
30+ clearTimeout ( timeout ) ;
31+ emitter . removeEventListener ( eventName , handler ) ;
32+ resolve ( event . detail ) ;
33+ }
34+ } ;
35+
36+ emitter . addEventListener ( eventName , handler ) ;
37+ } ) ;
38+ }
39+
2140const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto" ;
2241const TEST_NETWORK_CONFIG : AutoSharding = {
2342 clusterId : 0 ,
@@ -64,33 +83,29 @@ describe("Reliable Channel: Acks", () => {
6483 // Alice sets up message tracking
6584 const messageId = ReliableChannel . getMessageId ( message ) ;
6685
67- let messageReceived = false ;
68- reliableChannelBob . addEventListener ( "message-received" , ( event ) => {
69- if ( bytesToUtf8 ( event . detail . payload ) === "first message in channel" ) {
70- messageReceived = true ;
71- }
72- } ) ;
86+ const messageReceivedPromise = waitForEvent < IDecodedMessage > (
87+ reliableChannelBob ,
88+ "message-received" ,
89+ ( msg ) => bytesToUtf8 ( msg . payload ) === "first message in channel"
90+ ) ;
7391
74- let messageAcknowledged = false ;
75- reliableChannelAlice . addEventListener ( "message-acknowledged" , ( event ) => {
76- if ( event . detail === messageId ) {
77- messageAcknowledged = true ;
78- }
79- } ) ;
92+ const messageAcknowledgedPromise = waitForEvent < string > (
93+ reliableChannelAlice ,
94+ "message-acknowledged" ,
95+ ( id ) => id === messageId
96+ ) ;
8097
98+ // Alice sends the message
8199 reliableChannelAlice . send ( message ) ;
82100
83101 // Wait for Bob to receive the message to ensure it uses it in causal history
84- while ( ! messageReceived ) {
85- await delay ( 50 ) ;
86- }
87- // Bobs sends a message now, it should include first one in causal history
102+ await messageReceivedPromise ;
103+
104+ // Bob sends a message now, it should include first one in causal history
88105 reliableChannelBob . send ( utf8ToBytes ( "second message in channel" ) ) ;
89- while ( ! messageAcknowledged ) {
90- await delay ( 50 ) ;
91- }
92106
93- expect ( messageAcknowledged ) . to . be . true ;
107+ // Wait for Alice to receive acknowledgment
108+ await messageAcknowledgedPromise ;
94109 } ) ;
95110
96111 it ( "Re-sent message is acknowledged once other parties join." , async ( ) => {
@@ -115,18 +130,17 @@ describe("Reliable Channel: Acks", () => {
115130 // acknowledged in this test.
116131 const message = utf8ToBytes ( "message to be acknowledged" ) ;
117132 const messageId = ReliableChannel . getMessageId ( message ) ;
133+
118134 let messageAcknowledged = false ;
119135 reliableChannelAlice . addEventListener ( "message-acknowledged" , ( event ) => {
120136 if ( event . detail === messageId ) {
121137 messageAcknowledged = true ;
122138 }
123139 } ) ;
124- reliableChannelAlice . send ( message ) ;
125140
126- // Wait a bit to ensure Bob does not receive the message
127- await delay ( 100 ) ;
141+ reliableChannelAlice . send ( message ) ;
128142
129- // Now Bob goes online
143+ // Now Bob goes online (no need to wait since Bob wasn't online to receive it)
130144 const mockWakuNodeBob = new MockWakuNode ( commonEventEmitter ) ;
131145 const reliableChannelBob = await ReliableChannel . create (
132146 mockWakuNodeBob ,
@@ -141,47 +155,51 @@ describe("Reliable Channel: Acks", () => {
141155 }
142156 ) ;
143157
144- // Track when Bob receives the message
145- let bobReceivedMessage = false ;
146- reliableChannelBob . addEventListener ( "message-received" , ( event ) => {
147- if ( bytesToUtf8 ( event . detail . payload ! ) === "message to be acknowledged" ) {
148- bobReceivedMessage = true ;
149- }
150- } ) ;
151-
152158 // Some sync messages are exchanged
153159 await reliableChannelAlice [ "sendSyncMessage" ] ( ) ;
154160 await reliableChannelBob [ "sendSyncMessage" ] ( ) ;
155161
156- // wait a bit to ensure messages are processed
157- await delay ( 100 ) ;
162+ // Wait for Bob to receive "some message" to ensure sync messages were processed
163+ const bobReceivedSomeMessagePromise = waitForEvent < IDecodedMessage > (
164+ reliableChannelBob ,
165+ "message-received" ,
166+ ( msg ) => bytesToUtf8 ( msg . payload ) === "some message"
167+ ) ;
158168
159169 // Some content messages are exchanged too
160170 reliableChannelAlice . send ( utf8ToBytes ( "some message" ) ) ;
161171 reliableChannelBob . send ( utf8ToBytes ( "some other message" ) ) ;
162172
163- // wait a bit to ensure messages are processed
164- await delay ( 100 ) ;
173+ // Wait for the "some message" to be received to ensure messages are processed
174+ await bobReceivedSomeMessagePromise ;
165175
166176 // At this point, the message shouldn't be acknowledged yet as Bob
167177 // does not have a complete log
168178 expect ( messageAcknowledged ) . to . be . false ;
169179
170180 // Now Alice resends the message
181+ const bobReceivedMessagePromise = waitForEvent < IDecodedMessage > (
182+ reliableChannelBob ,
183+ "message-received" ,
184+ ( msg ) => bytesToUtf8 ( msg . payload ) === "message to be acknowledged"
185+ ) ;
186+
171187 reliableChannelAlice . send ( message ) ;
172188
173189 // Wait for Bob to receive the message
174- while ( ! bobReceivedMessage ) {
175- await delay ( 50 ) ;
176- }
190+ await bobReceivedMessagePromise ;
191+
192+ // Set up promise waiter for acknowledgment before Bob sends sync
193+ const messageAcknowledgedPromise = waitForEvent < string > (
194+ reliableChannelAlice ,
195+ "message-acknowledged" ,
196+ ( id ) => id === messageId
197+ ) ;
177198
178199 // Bob receives it, and should include it in its sync
179200 await reliableChannelBob [ "sendSyncMessage" ] ( ) ;
180- while ( ! messageAcknowledged ) {
181- await delay ( 50 ) ;
182- }
183201
184- // The sync should acknowledge the message
185- expect ( messageAcknowledged ) . to . be . true ;
202+ // Wait for acknowledgment
203+ await messageAcknowledgedPromise ;
186204 } ) ;
187205} ) ;
0 commit comments