Skip to content

Commit ac56778

Browse files
authored
Merge pull request #762 from status-im/waku-filter
Implement Waku Filter protocol
2 parents 1dc4897 + 7a2dcd9 commit ac56778

File tree

8 files changed

+842
-2
lines changed

8 files changed

+842
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied.
13+
- **Experimental** support for the [Waku Filter](https://rfc.vac.dev/spec/12/) protocol (client side) added, currently only works in NodeJS.
1314

1415
### Changed
1516

proto/waku/v2/filter.proto

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
syntax = "proto3";
2+
3+
package waku.v2;
4+
5+
import "waku/v2/message.proto";
6+
7+
message FilterRequest {
8+
bool subscribe = 1;
9+
string topic = 2;
10+
repeated ContentFilter content_filters = 3;
11+
12+
message ContentFilter {
13+
string content_topic = 1;
14+
}
15+
}
16+
17+
message MessagePush {
18+
repeated WakuMessage messages = 1;
19+
}
20+
21+
message FilterRPC {
22+
string request_id = 1;
23+
FilterRequest request = 2;
24+
MessagePush push = 3;
25+
}

src/lib/waku.node.spec.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,4 +305,27 @@ describe("Wait for remote peer / get peers", function () {
305305
expect(nimPeerId).to.not.be.undefined;
306306
expect(peers.includes(nimPeerId as string)).to.be.true;
307307
});
308+
309+
it("Filter", async function () {
310+
this.timeout(20_000);
311+
nwaku = new Nwaku(makeLogFileName(this));
312+
await nwaku.start({ filter: true });
313+
const multiAddrWithId = await nwaku.getMultiaddrWithId();
314+
315+
waku = await Waku.create({
316+
staticNoiseKey: NOISE_KEY_1,
317+
});
318+
await waku.dial(multiAddrWithId);
319+
await waku.waitForRemotePeer([Protocols.Filter]);
320+
321+
const peers = [];
322+
for await (const peer of waku.filter.peers) {
323+
peers.push(peer.id.toB58String());
324+
}
325+
326+
const nimPeerId = multiAddrWithId.getPeerId();
327+
328+
expect(nimPeerId).to.not.be.undefined;
329+
expect(peers.includes(nimPeerId as string)).to.be.true;
330+
});
308331
});

src/lib/waku.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { Multiaddr, multiaddr } from "multiaddr";
1717
import PeerId from "peer-id";
1818

1919
import { Bootstrap, BootstrapOptions } from "./discovery";
20+
import { FilterCodec, WakuFilter } from "./waku_filter";
2021
import { LightPushCodec, WakuLightPush } from "./waku_light_push";
2122
import { DecryptionMethod, WakuMessage } from "./waku_message";
2223
import { RelayCodecs, WakuRelay } from "./waku_relay";
@@ -39,6 +40,7 @@ export enum Protocols {
3940
Relay = "relay",
4041
Store = "store",
4142
LightPush = "lightpush",
43+
Filter = "filter",
4244
}
4345

4446
export interface CreateOptions {
@@ -102,6 +104,7 @@ export class Waku {
102104
public libp2p: Libp2p;
103105
public relay: WakuRelay;
104106
public store: WakuStore;
107+
public filter: WakuFilter;
105108
public lightPush: WakuLightPush;
106109

107110
private pingKeepAliveTimers: {
@@ -115,11 +118,13 @@ export class Waku {
115118
options: CreateOptions,
116119
libp2p: Libp2p,
117120
store: WakuStore,
118-
lightPush: WakuLightPush
121+
lightPush: WakuLightPush,
122+
filter: WakuFilter
119123
) {
120124
this.libp2p = libp2p;
121125
this.relay = libp2p.pubsub as unknown as WakuRelay;
122126
this.store = store;
127+
this.filter = filter;
123128
this.lightPush = lightPush;
124129
this.pingKeepAliveTimers = {};
125130
this.relayKeepAliveTimers = {};
@@ -220,10 +225,17 @@ export class Waku {
220225
pubSubTopic: options?.pubSubTopic,
221226
});
222227
const wakuLightPush = new WakuLightPush(libp2p);
228+
const wakuFilter = new WakuFilter(libp2p);
223229

224230
await libp2p.start();
225231

226-
return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush);
232+
return new Waku(
233+
options ? options : {},
234+
libp2p,
235+
wakuStore,
236+
wakuLightPush,
237+
wakuFilter
238+
);
227239
}
228240

229241
/**
@@ -253,6 +265,9 @@ export class Waku {
253265
if (_protocols.includes(Protocols.LightPush)) {
254266
codecs.push(LightPushCodec);
255267
}
268+
if (_protocols.includes(Protocols.Filter)) {
269+
codecs.push(FilterCodec);
270+
}
256271

257272
return this.libp2p.dialProtocol(peer, codecs);
258273
}
@@ -297,6 +312,7 @@ export class Waku {
297312
): void {
298313
this.relay.addDecryptionKey(key, options);
299314
this.store.addDecryptionKey(key, options);
315+
this.filter.addDecryptionKey(key, options);
300316
}
301317

302318
/**
@@ -308,6 +324,7 @@ export class Waku {
308324
deleteDecryptionKey(key: Uint8Array | string): void {
309325
this.relay.deleteDecryptionKey(key);
310326
this.store.deleteDecryptionKey(key);
327+
this.filter.deleteDecryptionKey(key);
311328
}
312329

313330
/**
@@ -381,6 +398,16 @@ export class Waku {
381398
promises.push(lightPushPromise);
382399
}
383400

401+
if (protocols.includes(Protocols.Filter)) {
402+
const filterPromise = (async (): Promise<void> => {
403+
for await (const peer of this.filter.peers) {
404+
dbg("Filter peer found", peer.id.toB58String());
405+
break;
406+
}
407+
})();
408+
promises.push(filterPromise);
409+
}
410+
384411
if (timeoutMs) {
385412
await rejectOnTimeout(
386413
Promise.all(promises),

src/lib/waku_filter/filter_rpc.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { Reader } from "protobufjs/minimal";
2+
import { v4 as uuid } from "uuid";
3+
4+
import * as proto from "../../proto/waku/v2/filter";
5+
6+
export type ContentFilter = {
7+
contentTopic: string;
8+
};
9+
10+
/**
11+
* FilterRPC represents a message conforming to the Waku Filter protocol
12+
*/
13+
export class FilterRPC {
14+
public constructor(public proto: proto.FilterRPC) {}
15+
16+
static createRequest(
17+
topic: string,
18+
contentFilters: ContentFilter[],
19+
requestId?: string,
20+
subscribe = true
21+
): FilterRPC {
22+
return new FilterRPC({
23+
requestId: requestId || uuid(),
24+
request: {
25+
subscribe,
26+
topic,
27+
contentFilters,
28+
},
29+
push: undefined,
30+
});
31+
}
32+
33+
/**
34+
*
35+
* @param bytes Uint8Array of bytes from a FilterRPC message
36+
* @returns FilterRPC
37+
*/
38+
static decode(bytes: Uint8Array): FilterRPC {
39+
const res = proto.FilterRPC.decode(Reader.create(bytes));
40+
return new FilterRPC(res);
41+
}
42+
43+
/**
44+
* Encode the current FilterRPC request to bytes
45+
* @returns Uint8Array
46+
*/
47+
encode(): Uint8Array {
48+
return proto.FilterRPC.encode(this.proto).finish();
49+
}
50+
51+
get push(): proto.MessagePush | undefined {
52+
return this.proto.push;
53+
}
54+
55+
get requestId(): string {
56+
return this.proto.requestId;
57+
}
58+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { expect } from "chai";
2+
import debug from "debug";
3+
4+
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
5+
import { delay } from "../../test_utils/delay";
6+
import { Protocols, Waku } from "../waku";
7+
import { WakuMessage } from "../waku_message";
8+
9+
const log = debug("waku:test");
10+
11+
const TestContentTopic = "/test/1/waku-filter";
12+
13+
describe("Waku Filter", () => {
14+
let waku: Waku;
15+
let nwaku: Nwaku;
16+
17+
afterEach(async function () {
18+
!!nwaku && nwaku.stop();
19+
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
20+
});
21+
22+
beforeEach(async function () {
23+
this.timeout(10000);
24+
nwaku = new Nwaku(makeLogFileName(this));
25+
await nwaku.start({ filter: true });
26+
waku = await Waku.create({
27+
staticNoiseKey: NOISE_KEY_1,
28+
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
29+
});
30+
await waku.dial(await nwaku.getMultiaddrWithId());
31+
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
32+
});
33+
34+
it("creates a subscription", async function () {
35+
this.timeout(10000);
36+
37+
let messageCount = 0;
38+
const messageText = "Filtering works!";
39+
const callback = (msg: WakuMessage): void => {
40+
log("Got a message");
41+
messageCount++;
42+
expect(msg.contentTopic).to.eq(TestContentTopic);
43+
expect(msg.payloadAsUtf8).to.eq(messageText);
44+
};
45+
await waku.filter.subscribe(callback, [TestContentTopic]);
46+
const message = await WakuMessage.fromUtf8String(
47+
messageText,
48+
TestContentTopic
49+
);
50+
await waku.relay.send(message);
51+
while (messageCount === 0) {
52+
await delay(250);
53+
}
54+
expect(messageCount).to.eq(1);
55+
});
56+
57+
it("handles multiple messages", async function () {
58+
this.timeout(10000);
59+
60+
let messageCount = 0;
61+
const callback = (msg: WakuMessage): void => {
62+
messageCount++;
63+
expect(msg.contentTopic).to.eq(TestContentTopic);
64+
};
65+
await waku.filter.subscribe(callback, [TestContentTopic]);
66+
await waku.relay.send(
67+
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
68+
);
69+
await waku.relay.send(
70+
await WakuMessage.fromUtf8String(
71+
"Filtering still works!",
72+
TestContentTopic
73+
)
74+
);
75+
while (messageCount < 2) {
76+
await delay(250);
77+
}
78+
expect(messageCount).to.eq(2);
79+
});
80+
81+
it("unsubscribes", async function () {
82+
let messageCount = 0;
83+
const callback = (): void => {
84+
messageCount++;
85+
};
86+
const unsubscribe = await waku.filter.subscribe(callback, [
87+
TestContentTopic,
88+
]);
89+
await waku.relay.send(
90+
await WakuMessage.fromUtf8String(
91+
"This should be received",
92+
TestContentTopic
93+
)
94+
);
95+
await delay(100);
96+
await unsubscribe();
97+
await waku.relay.send(
98+
await WakuMessage.fromUtf8String(
99+
"This should not be received",
100+
TestContentTopic
101+
)
102+
);
103+
await delay(100);
104+
expect(messageCount).to.eq(1);
105+
});
106+
});

0 commit comments

Comments
 (0)