Skip to content

Commit 47c5565

Browse files
authored
Wait for heartbeat before considering relay peer ready (#472)
1 parent 03491a8 commit 47c5565

File tree

8 files changed

+75
-53
lines changed

8 files changed

+75
-53
lines changed

src/lib/waku.node.spec.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { expect } from 'chai';
2+
import debug from 'debug';
23
import PeerId from 'peer-id';
34

45
import {
@@ -12,6 +13,8 @@ import { Protocols, Waku } from './waku';
1213
import { WakuMessage } from './waku_message';
1314
import { generateSymmetricKey } from './waku_message/version_1';
1415

16+
const dbg = debug('waku:test');
17+
1518
const TestContentTopic = '/test/1/waku/utf8';
1619

1720
describe('Waku Dial [node only]', function () {
@@ -20,8 +23,8 @@ describe('Waku Dial [node only]', function () {
2023
let nimWaku: NimWaku;
2124

2225
afterEach(async function () {
23-
nimWaku ? nimWaku.stop() : null;
24-
waku ? await waku.stop() : null;
26+
!!nimWaku && nimWaku.stop();
27+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
2528
});
2629

2730
it('js connects to nim', async function () {
@@ -48,8 +51,8 @@ describe('Waku Dial [node only]', function () {
4851
let nimWaku: NimWaku;
4952

5053
afterEach(async function () {
51-
nimWaku ? nimWaku.stop() : null;
52-
waku ? await waku.stop() : null;
54+
!!nimWaku && nimWaku.stop();
55+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
5356
});
5457

5558
before(function () {
@@ -83,8 +86,8 @@ describe('Waku Dial [node only]', function () {
8386
let nimWaku: NimWaku;
8487

8588
afterEach(async function () {
86-
nimWaku ? nimWaku.stop() : null;
87-
waku ? await waku.stop() : null;
89+
!!nimWaku && nimWaku.stop();
90+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
8891
});
8992

9093
it('Passing an array', async function () {
@@ -145,6 +148,7 @@ describe('Decryption Keys', () => {
145148
let waku1: Waku;
146149
let waku2: Waku;
147150
beforeEach(async function () {
151+
this.timeout(5000);
148152
[waku1, waku2] = await Promise.all([
149153
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
150154
Waku.create({
@@ -162,9 +166,8 @@ describe('Decryption Keys', () => {
162166
});
163167

164168
afterEach(async function () {
165-
this.timeout(5000);
166-
await waku1.stop();
167-
await waku2.stop();
169+
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
170+
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
168171
});
169172

170173
it('Used by Waku Relay', async function () {
@@ -205,8 +208,8 @@ describe('Wait for remote peer / get peers', function () {
205208
let nimWaku: NimWaku;
206209

207210
afterEach(async function () {
208-
nimWaku ? nimWaku.stop() : null;
209-
waku ? await waku.stop() : null;
211+
!!nimWaku && nimWaku.stop();
212+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
210213
});
211214

212215
it('Relay', async function () {
@@ -215,11 +218,15 @@ describe('Wait for remote peer / get peers', function () {
215218
await nimWaku.start();
216219
const multiAddrWithId = await nimWaku.getMultiaddrWithId();
217220

221+
dbg('Create');
218222
waku = await Waku.create({
219223
staticNoiseKey: NOISE_KEY_1,
220224
});
225+
dbg('Dial');
221226
await waku.dial(multiAddrWithId);
227+
dbg('waitForRemotePeer');
222228
await waku.waitForRemotePeer([Protocols.Relay]);
229+
dbg('Done, get peers');
223230
const peers = waku.relay.getPeers();
224231
const nimPeerId = multiAddrWithId.getPeerId();
225232

src/lib/waku.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,9 @@ export class Waku {
326326
// No peer yet available, wait for a subscription
327327
const promise = new Promise<void>((resolve) => {
328328
this.libp2p.pubsub.once('pubsub:subscription-change', () => {
329-
resolve();
329+
// Remote peer subscribed to topic, now wait for a heartbeat
330+
// so that the mesh is updated and the remote peer added to it
331+
this.libp2p.pubsub.once('gossipsub:heartbeat', resolve);
330332
});
331333
});
332334
promises.push(promise);

src/lib/waku_light_push/index.node.spec.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
import { expect } from 'chai';
2+
import debug from 'debug';
23

34
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
45
import { delay } from '../delay';
56
import { Protocols, Waku } from '../waku';
67
import { WakuMessage } from '../waku_message';
78

9+
const dbg = debug('waku:test:lightpush');
10+
811
const TestContentTopic = '/test/1/waku-light-push/utf8';
912

1013
describe('Waku Light Push [node only]', () => {
1114
let waku: Waku;
1215
let nimWaku: NimWaku;
1316

1417
afterEach(async function () {
15-
nimWaku ? nimWaku.stop() : null;
16-
waku ? await waku.stop() : null;
18+
!!nimWaku && nimWaku.stop();
19+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
1720
});
1821

1922
it('Push successfully', async function () {
@@ -72,13 +75,16 @@ describe('Waku Light Push [node only]', () => {
7275
TestContentTopic
7376
);
7477

78+
dbg('Send message via lightpush');
7579
const pushResponse = await waku.lightPush.push(message, {
7680
peerId: nimPeerId,
7781
});
82+
dbg('Ack received', pushResponse);
7883
expect(pushResponse?.isSuccess).to.be.true;
7984

8085
let msgs: WakuMessage[] = [];
8186

87+
dbg('Waiting for message to show on nim-waku side');
8288
while (msgs.length === 0) {
8389
await delay(200);
8490
msgs = await nimWaku.messages();

src/lib/waku_message/index.node.spec.ts

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,19 @@ describe('Waku Message [node only]', function () {
3535
});
3636

3737
nimWaku = new NimWaku(makeLogFileName(this));
38-
await nimWaku.start({ rpcPrivate: true });
38+
dbg('Starting nim-waku node');
39+
await nimWaku.start({ rpcPrivate: true, lightpush: true });
3940

41+
dbg('Dialing to nim-waku node');
4042
await waku.dial(await nimWaku.getMultiaddrWithId());
41-
await waku.waitForRemotePeer([Protocols.Relay]);
42-
43-
let peers = await waku.relay.getPeers();
44-
while (peers.size === 0) {
45-
await delay(200);
46-
peers = await waku.relay.getPeers();
47-
}
43+
dbg('Wait for remote peer');
44+
await waku.waitForRemotePeer([Protocols.Relay, Protocols.LightPush]);
45+
dbg('Remote peer ready');
4846
});
4947

5048
afterEach(async function () {
51-
nimWaku ? nimWaku.stop() : null;
52-
waku ? await waku.stop() : null;
49+
!!nimWaku && nimWaku.stop();
50+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
5351
});
5452

5553
it('JS decrypts nim message [asymmetric, no signature]', async function () {
@@ -86,11 +84,13 @@ describe('Waku Message [node only]', function () {
8684
it('Js encrypts message for nim [asymmetric, no signature]', async function () {
8785
this.timeout(5000);
8886

87+
dbg('Ask nim-waku to generate asymmetric key pair');
8988
const keyPair = await nimWaku.getAsymmetricKeyPair();
9089
const privateKey = hexToBuf(keyPair.privateKey);
9190
const publicKey = hexToBuf(keyPair.publicKey);
9291

9392
const messageText = 'This is a message I am going to encrypt';
93+
dbg('Encrypt message');
9494
const message = await WakuMessage.fromUtf8String(
9595
messageText,
9696
TestContentTopic,
@@ -99,15 +99,18 @@ describe('Waku Message [node only]', function () {
9999
}
100100
);
101101

102+
dbg('Send message over relay');
102103
await waku.relay.send(message);
103104

104105
let msgs: WakuRelayMessage[] = [];
105106

106107
while (msgs.length === 0) {
108+
dbg('Wait for message to be seen by nim-waku');
107109
await delay(200);
108110
msgs = await nimWaku.getAsymmetricMessages(privateKey);
109111
}
110112

113+
dbg('Check message content');
111114
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
112115
expect(hexToBuf(msgs[0].payload).toString('utf-8')).to.equal(messageText);
113116
});
@@ -121,6 +124,7 @@ describe('Waku Message [node only]', function () {
121124
payload: Buffer.from(messageText, 'utf-8').toString('hex'),
122125
};
123126

127+
dbg('Generate symmetric key');
124128
const symKey = generateSymmetricKey();
125129

126130
waku.relay.addDecryptionKey(symKey);
@@ -131,10 +135,11 @@ describe('Waku Message [node only]', function () {
131135
}
132136
);
133137

134-
dbg('Post message');
138+
dbg('Post message using nim-waku');
135139
await nimWaku.postSymmetricMessage(message, symKey);
136-
140+
dbg('Wait for message to be received by js-waku');
137141
const receivedMsg = await receivedMsgPromise;
142+
dbg('Message received by js-waku');
138143

139144
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
140145
expect(receivedMsg.version).to.eq(1);
@@ -144,8 +149,9 @@ describe('Waku Message [node only]', function () {
144149
it('Js encrypts message for nim [symmetric, no signature]', async function () {
145150
this.timeout(5000);
146151

152+
dbg('Getting symmetric key from nim-waku');
147153
const symKey = await nimWaku.getSymmetricKey();
148-
154+
dbg('Encrypting message with js-waku');
149155
const messageText =
150156
'This is a message I am going to encrypt with a symmetric key';
151157
const message = await WakuMessage.fromUtf8String(
@@ -155,13 +161,14 @@ describe('Waku Message [node only]', function () {
155161
symKey: symKey,
156162
}
157163
);
158-
164+
dbg('Sending message over relay');
159165
await waku.relay.send(message);
160166

161167
let msgs: WakuRelayMessage[] = [];
162168

163169
while (msgs.length === 0) {
164170
await delay(200);
171+
dbg('Getting messages from nim-waku');
165172
msgs = await nimWaku.getSymmetricMessages(symKey);
166173
}
167174

src/lib/waku_relay/index.node.spec.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ describe('Waku Relay [node only]', () => {
5555
});
5656

5757
afterEach(async function () {
58-
this.timeout(5000);
59-
await waku1.stop();
60-
await waku2.stop();
58+
!!waku1 &&
59+
waku1.stop().catch((e) => console.log('Waku failed to stop', e));
60+
!!waku2 &&
61+
waku2.stop().catch((e) => console.log('Waku failed to stop', e));
6162
});
6263

6364
it('Subscribe', async function () {
@@ -322,8 +323,8 @@ describe('Waku Relay [node only]', () => {
322323
});
323324

324325
afterEach(async function () {
325-
nimWaku ? nimWaku.stop() : null;
326-
waku ? await waku.stop() : null;
326+
!!nimWaku && nimWaku.stop();
327+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
327328
});
328329

329330
it('nim subscribes to js', async function () {
@@ -392,11 +393,11 @@ describe('Waku Relay [node only]', () => {
392393
let nimWaku: NimWaku;
393394

394395
afterEach(async function () {
395-
nimWaku ? nimWaku.stop() : null;
396-
await Promise.all([
397-
waku1 ? await waku1.stop() : null,
398-
waku2 ? await waku2.stop() : null,
399-
]);
396+
!!nimWaku && nimWaku.stop();
397+
!!waku1 &&
398+
waku1.stop().catch((e) => console.log('Waku failed to stop', e));
399+
!!waku2 &&
400+
waku2.stop().catch((e) => console.log('Waku failed to stop', e));
400401
});
401402

402403
it('Js publishes, other Js receives', async function () {

src/lib/waku_relay/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,10 +385,12 @@ export class WakuRelay extends Gossipsub {
385385
});
386386
// Publish messages to peers
387387
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
388+
dbg(`Relay message to ${toSend.size} peers`);
388389
toSend.forEach((id) => {
389390
if (id === msg.from) {
390391
return;
391392
}
393+
dbg('Relay message to', id);
392394
this._sendRpc(id, rpc);
393395
});
394396
}

src/lib/waku_store/index.node.spec.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ describe('Waku Store', () => {
2727
let nimWaku: NimWaku;
2828

2929
afterEach(async function () {
30-
nimWaku ? nimWaku.stop() : null;
31-
waku ? await waku.stop() : null;
30+
!!nimWaku && nimWaku.stop();
31+
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
3232
});
3333

3434
it('Retrieves history', async function () {
@@ -301,7 +301,8 @@ describe('Waku Store', () => {
301301
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
302302
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
303303

304-
await Promise.all([waku1.stop(), waku2.stop()]);
304+
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
305+
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
305306
});
306307

307308
it('Retrieves history with asymmetric & symmetric encrypted messages on different content topics', async function () {
@@ -415,7 +416,8 @@ describe('Waku Store', () => {
415416
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
416417
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
417418

418-
await Promise.all([waku1.stop(), waku2.stop()]);
419+
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
420+
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
419421
});
420422

421423
it('Retrieves history using start and end time', async function () {

src/test_utils/nim_waku.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import debug from 'debug';
1212
import { Multiaddr, multiaddr } from 'multiaddr';
1313
import PeerId from 'peer-id';
1414

15-
import { delay } from '../lib/delay';
1615
import { hexToBuf } from '../lib/utils';
1716
import { DefaultPubSubTopic } from '../lib/waku';
1817
import { WakuMessage } from '../lib/waku_message';
@@ -141,16 +140,12 @@ export class NimWaku {
141140
}
142141

143142
public stop(): void {
144-
// If killed too fast the SIGINT may not be registered
145-
delay(100).then(() => {
146-
dbg(
147-
`nim-waku ${
148-
this.process ? this.process.pid : this.pid
149-
} getting SIGINT at ${new Date().toLocaleTimeString()}`
150-
);
151-
this.process ? this.process.kill('SIGINT') : null;
152-
this.process = undefined;
153-
});
143+
const pid = this.process ? this.process.pid : this.pid;
144+
dbg(`nim-waku ${pid} getting SIGINT at ${new Date().toLocaleTimeString()}`);
145+
if (!this.process) throw 'nim-waku process not set';
146+
const res = this.process.kill('SIGINT');
147+
dbg(`nim-waku ${pid} interrupted:`, res);
148+
this.process = undefined;
154149
}
155150

156151
async waitForLog(msg: string, timeout: number): Promise<void> {

0 commit comments

Comments
 (0)