diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index bdf08ebcf3..049da6b4f8 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -84,6 +84,11 @@ export interface ISendMessage { */ export type RequestId = string; +/** + * Listener for subscribe messages. + */ +export type SubscribeListener = (message: IDecodedMessage) => void; + export interface IMetaSetter { (message: IProtoMessage & { meta: undefined }): Uint8Array; } diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts index 8551cbc4b3..ed6020361f 100644 --- a/packages/sdk/src/messaging/ack_manager.ts +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -4,12 +4,12 @@ import { IDecoder, IFilter, IStore, - NetworkConfig + NetworkConfig, + SubscribeListener } from "@waku/interfaces"; import { createRoutingInfo } from "@waku/utils"; import { MessageStore } from "./message_store.js"; -import { IAckManager } from "./utils.js"; type AckManagerConstructorParams = { messageStore: MessageStore; @@ -18,6 +18,14 @@ type AckManagerConstructorParams = { networkConfig: NetworkConfig; }; +export interface IAckManager { + start(): void; + stop(): void; + observe(contentTopic: string): Promise; + subscribe(contentTopic: string, cb: SubscribeListener): Promise; + unsubscribe(contentTopic: string): Promise; +} + export class AckManager implements IAckManager { private readonly messageStore: MessageStore; private readonly filterAckManager: FilterAckManager; @@ -49,7 +57,7 @@ export class AckManager implements IAckManager { this.subscribedContentTopics.clear(); } - public async subscribe(contentTopic: string): Promise { + public async observe(contentTopic: string): Promise { if (this.subscribedContentTopics.has(contentTopic)) { return true; } @@ -69,6 +77,24 @@ export class AckManager implements IAckManager { ]) ).some((success) => success); } + + public async subscribe( + contentTopic: string, + cb: SubscribeListener + ): Promise { + const decoder = createDecoder( + contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic + }) + ); + + return this.filterAckManager.subscribe(decoder, cb); + } + + public async unsubscribe(contentTopic: string): Promise { + return this.filterAckManager.unsubscribe(contentTopic); + } } class FilterAckManager { @@ -77,7 +103,9 @@ class FilterAckManager { public constructor( private messageStore: MessageStore, private filter: IFilter - ) {} + ) { + this.onMessage = this.onMessage.bind(this); + } public start(): void { return; @@ -91,18 +119,48 @@ class FilterAckManager { this.decoders.clear(); } - public async subscribe(decoder: IDecoder): Promise { - const success = await this.filter.subscribe( - decoder, - this.onMessage.bind(this) - ); + public async subscribe( + decoder: IDecoder, + cb?: SubscribeListener + ): Promise { + const success = await this.filter.subscribe(decoder, (message) => { + try { + cb?.(message); + } catch (error) { + // ignore + } + + try { + this.onMessage(message); + } catch (error) { + // ignore + } + }); + if (success) { this.decoders.add(decoder); } + return success; } - private async onMessage(message: IDecodedMessage): Promise { + public async unsubscribe(contentTopic: string): Promise { + const decoders = Array.from(this.decoders).filter( + (decoder) => decoder.contentTopic === contentTopic + ); + + const promises = decoders.map((decoder) => + this.filter.unsubscribe(decoder) + ); + + await Promise.all(promises); + + for (const decoder of decoders) { + this.decoders.delete(decoder); + } + } + + private onMessage(message: IDecodedMessage): void { if (!this.messageStore.has(message.hashStr)) { this.messageStore.add(message, { filterAck: true }); } diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index 28a17dbade..35109e1226 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -4,7 +4,8 @@ import { ISendMessage, IStore, NetworkConfig, - RequestId + RequestId, + SubscribeListener } from "@waku/interfaces"; import { AckManager } from "./ack_manager.js"; @@ -58,4 +59,15 @@ export class Messaging implements IMessaging { public send(wakuLikeMessage: ISendMessage): Promise { return this.sender.send(wakuLikeMessage); } + + public subscribe( + contentTopic: string, + cb: SubscribeListener + ): Promise { + return this.ackManager.subscribe(contentTopic, cb); + } + + public unsubscribe(contentTopic: string): Promise { + return this.ackManager.unsubscribe(contentTopic); + } } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index b32978d8b8..06d2319d97 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -48,7 +48,7 @@ export class Sender { public async send(message: ISendMessage): Promise { const requestId = await this.messageStore.queue(message); - await this.ackManager.subscribe(message.contentTopic); + await this.ackManager.observe(message.contentTopic); await this.sendMessage(requestId, message); return requestId; diff --git a/packages/sdk/src/messaging/utils.ts b/packages/sdk/src/messaging/utils.ts deleted file mode 100644 index 3ce045e68c..0000000000 --- a/packages/sdk/src/messaging/utils.ts +++ /dev/null @@ -1,5 +0,0 @@ -export interface IAckManager { - start(): void; - stop(): void; - subscribe(contentTopic: string): Promise; -} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 4e8c07d7fb..59e84efd7f 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -21,7 +21,8 @@ import type { IWaku, IWakuEventEmitter, Libp2p, - NetworkConfig + NetworkConfig, + SubscribeListener } from "@waku/interfaces"; import { DefaultNetworkConfig, @@ -305,6 +306,25 @@ export class WakuNode implements IWaku { return this.messaging.send(message); } + public subscribe( + contentTopic: string, + cb: SubscribeListener + ): Promise { + if (!this.messaging) { + throw new Error("Messaging not initialized"); + } + + return this.messaging.subscribe(contentTopic, cb); + } + + public unsubscribe(contentTopic: string): Promise { + if (!this.messaging) { + throw new Error("Messaging not initialized"); + } + + return this.messaging.unsubscribe(contentTopic); + } + private createRoutingInfo( contentTopic?: string, shardId?: number