Skip to content

Commit 68b6bf9

Browse files
committed
feat: add POC for Subscribe API
1 parent 82be279 commit 68b6bf9

File tree

6 files changed

+108
-18
lines changed

6 files changed

+108
-18
lines changed

packages/interfaces/src/message.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ export interface ISendMessage {
8484
*/
8585
export type RequestId = string;
8686

87+
/**
88+
* Listener for subscribe messages.
89+
*/
90+
export type SubscribeListener = (message: IDecodedMessage) => void;
91+
8792
export interface IMetaSetter {
8893
(message: IProtoMessage & { meta: undefined }): Uint8Array;
8994
}

packages/sdk/src/messaging/ack_manager.ts

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import {
44
IDecoder,
55
IFilter,
66
IStore,
7-
NetworkConfig
7+
NetworkConfig,
8+
SubscribeListener
89
} from "@waku/interfaces";
910
import { createRoutingInfo } from "@waku/utils";
1011

1112
import { MessageStore } from "./message_store.js";
12-
import { IAckManager } from "./utils.js";
1313

1414
type AckManagerConstructorParams = {
1515
messageStore: MessageStore;
@@ -18,6 +18,14 @@ type AckManagerConstructorParams = {
1818
networkConfig: NetworkConfig;
1919
};
2020

21+
export interface IAckManager {
22+
start(): void;
23+
stop(): void;
24+
observe(contentTopic: string): Promise<boolean>;
25+
subscribe(contentTopic: string, cb: SubscribeListener): Promise<boolean>;
26+
unsubscribe(contentTopic: string): Promise<void>;
27+
}
28+
2129
export class AckManager implements IAckManager {
2230
private readonly messageStore: MessageStore;
2331
private readonly filterAckManager: FilterAckManager;
@@ -49,7 +57,7 @@ export class AckManager implements IAckManager {
4957
this.subscribedContentTopics.clear();
5058
}
5159

52-
public async subscribe(contentTopic: string): Promise<boolean> {
60+
public async observe(contentTopic: string): Promise<boolean> {
5361
if (this.subscribedContentTopics.has(contentTopic)) {
5462
return true;
5563
}
@@ -69,6 +77,24 @@ export class AckManager implements IAckManager {
6977
])
7078
).some((success) => success);
7179
}
80+
81+
public async subscribe(
82+
contentTopic: string,
83+
cb: SubscribeListener
84+
): Promise<boolean> {
85+
const decoder = createDecoder(
86+
contentTopic,
87+
createRoutingInfo(this.networkConfig, {
88+
contentTopic
89+
})
90+
);
91+
92+
return this.filterAckManager.subscribe(decoder, cb);
93+
}
94+
95+
public async unsubscribe(contentTopic: string): Promise<void> {
96+
return this.filterAckManager.unsubscribe(contentTopic);
97+
}
7298
}
7399

74100
class FilterAckManager {
@@ -77,7 +103,9 @@ class FilterAckManager {
77103
public constructor(
78104
private messageStore: MessageStore,
79105
private filter: IFilter
80-
) {}
106+
) {
107+
this.onMessage = this.onMessage.bind(this);
108+
}
81109

82110
public start(): void {
83111
return;
@@ -91,18 +119,48 @@ class FilterAckManager {
91119
this.decoders.clear();
92120
}
93121

94-
public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
95-
const success = await this.filter.subscribe(
96-
decoder,
97-
this.onMessage.bind(this)
98-
);
122+
public async subscribe(
123+
decoder: IDecoder<IDecodedMessage>,
124+
cb?: SubscribeListener
125+
): Promise<boolean> {
126+
const success = await this.filter.subscribe(decoder, (message) => {
127+
try {
128+
cb?.(message);
129+
} catch (error) {
130+
// ignore
131+
}
132+
133+
try {
134+
this.onMessage(message);
135+
} catch (error) {
136+
// ignore
137+
}
138+
});
139+
99140
if (success) {
100141
this.decoders.add(decoder);
101142
}
143+
102144
return success;
103145
}
104146

105-
private async onMessage(message: IDecodedMessage): Promise<void> {
147+
public async unsubscribe(contentTopic: string): Promise<void> {
148+
const decoders = Array.from(this.decoders).filter(
149+
(decoder) => decoder.contentTopic === contentTopic
150+
);
151+
152+
const promises = decoders.map((decoder) =>
153+
this.filter.unsubscribe(decoder)
154+
);
155+
156+
await Promise.all(promises);
157+
158+
for (const decoder of decoders) {
159+
this.decoders.delete(decoder);
160+
}
161+
}
162+
163+
private onMessage(message: IDecodedMessage): void {
106164
if (!this.messageStore.has(message.hashStr)) {
107165
this.messageStore.add(message, { filterAck: true });
108166
}

packages/sdk/src/messaging/messaging.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import {
44
ISendMessage,
55
IStore,
66
NetworkConfig,
7-
RequestId
7+
RequestId,
8+
SubscribeListener
89
} from "@waku/interfaces";
910

1011
import { AckManager } from "./ack_manager.js";
@@ -58,4 +59,15 @@ export class Messaging implements IMessaging {
5859
public send(wakuLikeMessage: ISendMessage): Promise<RequestId> {
5960
return this.sender.send(wakuLikeMessage);
6061
}
62+
63+
public subscribe(
64+
contentTopic: string,
65+
cb: SubscribeListener
66+
): Promise<boolean> {
67+
return this.ackManager.subscribe(contentTopic, cb);
68+
}
69+
70+
public unsubscribe(contentTopic: string): Promise<void> {
71+
return this.ackManager.unsubscribe(contentTopic);
72+
}
6173
}

packages/sdk/src/messaging/sender.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class Sender {
4848
public async send(message: ISendMessage): Promise<RequestId> {
4949
const requestId = await this.messageStore.queue(message);
5050

51-
await this.ackManager.subscribe(message.contentTopic);
51+
await this.ackManager.observe(message.contentTopic);
5252
await this.sendMessage(requestId, message);
5353

5454
return requestId;

packages/sdk/src/messaging/utils.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

packages/sdk/src/waku/waku.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import type {
2121
IWaku,
2222
IWakuEventEmitter,
2323
Libp2p,
24-
NetworkConfig
24+
NetworkConfig,
25+
SubscribeListener
2526
} from "@waku/interfaces";
2627
import {
2728
DefaultNetworkConfig,
@@ -305,6 +306,25 @@ export class WakuNode implements IWaku {
305306
return this.messaging.send(message);
306307
}
307308

309+
public subscribe(
310+
contentTopic: string,
311+
cb: SubscribeListener
312+
): Promise<boolean> {
313+
if (!this.messaging) {
314+
throw new Error("Messaging not initialized");
315+
}
316+
317+
return this.messaging.subscribe(contentTopic, cb);
318+
}
319+
320+
public unsubscribe(contentTopic: string): Promise<void> {
321+
if (!this.messaging) {
322+
throw new Error("Messaging not initialized");
323+
}
324+
325+
return this.messaging.unsubscribe(contentTopic);
326+
}
327+
308328
private createRoutingInfo(
309329
contentTopic?: string,
310330
shardId?: number

0 commit comments

Comments
 (0)