diff --git a/package-lock.json b/package-lock.json index 370aa3b75a..0c8bd49c99 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35998,7 +35998,8 @@ "@waku/sds": "^0.0.7", "@waku/utils": "0.0.27", "libp2p": "2.8.11", - "lodash.debounce": "^4.0.8" + "lodash.debounce": "^4.0.8", + "uuid": "^10.0.0" }, "devDependencies": { "@libp2p/interface": "2.10.4", diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index eb3b517eeb..6206a9c89f 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -55,11 +55,11 @@ export class LightPushCore { }; } - const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage( - encoder, - message, - protocol - ); + const { + rpc, + error: prepError, + message: protoMessage + } = await ProtocolHandler.preparePushMessage(encoder, message, protocol); if (prepError) { return { @@ -117,7 +117,21 @@ export class LightPushCore { }; } - return ProtocolHandler.handleResponse(bytes, protocol, peerId); + const processedResponse = ProtocolHandler.handleResponse( + bytes, + protocol, + peerId + ); + + if (processedResponse.success) { + return { + success: processedResponse.success, + failure: null, + message: protoMessage + }; + } + + return processedResponse; } private async getProtocol( diff --git a/packages/core/src/lib/light_push/protocol_handler.ts b/packages/core/src/lib/light_push/protocol_handler.ts index 429664f32d..0cf2835a96 100644 --- a/packages/core/src/lib/light_push/protocol_handler.ts +++ b/packages/core/src/lib/light_push/protocol_handler.ts @@ -1,5 +1,10 @@ import type { PeerId } from "@libp2p/interface"; -import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces"; +import type { + IEncoder, + IMessage, + IProtoMessage, + LightPushCoreResult +} from "@waku/interfaces"; import { LightPushError, LightPushStatusCode } from "@waku/interfaces"; import { PushResponse, WakuMessage } from "@waku/proto"; import { isMessageSizeUnderCap, Logger } from "@waku/utils"; @@ -15,8 +20,8 @@ type VersionedPushRpc = | ({ version: "v3" } & PushRpc); type PreparePushMessageResult = - | { rpc: VersionedPushRpc; error: null } - | { rpc: null; error: LightPushError }; + | { rpc: VersionedPushRpc; error: null; message?: IProtoMessage } + | { rpc: null; error: LightPushError; message?: IProtoMessage }; const log = new Logger("light-push:protocol-handler"); @@ -47,13 +52,15 @@ export class ProtocolHandler { log.info("Creating v3 RPC message"); return { rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic), - error: null + error: null, + message: protoMessage }; } log.info("Creating v2 RPC message"); return { rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic), + message: protoMessage, error: null }; } catch (err) { diff --git a/packages/core/src/lib/message/constants.ts b/packages/core/src/lib/message/constants.ts new file mode 100644 index 0000000000..0d54071228 --- /dev/null +++ b/packages/core/src/lib/message/constants.ts @@ -0,0 +1,2 @@ +export const OneMillion = BigInt(1_000_000); +export const Version = 0; diff --git a/packages/core/src/lib/message/index.ts b/packages/core/src/lib/message/index.ts index e4736e54e1..e5fbf51df3 100644 --- a/packages/core/src/lib/message/index.ts +++ b/packages/core/src/lib/message/index.ts @@ -1 +1,2 @@ export * as version_0 from "./version_0.js"; +export { OneMillion, Version } from "./constants.js"; diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index c127120e74..d3d8a7d333 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -16,10 +16,10 @@ import { bytesToHex } from "@waku/utils/bytes"; import { messageHash } from "../message_hash/index.js"; +import { OneMillion, Version } from "./constants.js"; + const log = new Logger("message:version-0"); -const OneMillion = BigInt(1_000_000); -export const Version = 0; export { proto }; export class DecodedMessage implements IDecodedMessage { diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index caecb73aec..bdf08ebcf3 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -69,6 +69,21 @@ export interface IMessage { rateLimitProof?: IRateLimitProof; } +/** + * Send message data structure used in {@link IWaku.send}. + */ +export interface ISendMessage { + contentTopic: string; + payload: Uint8Array; + ephemeral?: boolean; + rateLimitProof?: boolean; +} + +/** + * Request ID of attempt to send a message. + */ +export type RequestId = string; + export interface IMetaSetter { (message: IProtoMessage & { meta: undefined }): Uint8Array; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0fb60c182f..66e2ca09db 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -5,7 +5,7 @@ import type { DiscoveryOptions, PeerCache } from "./discovery.js"; import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { LightPushProtocolOptions } from "./light_push.js"; -import type { IDecodedMessage } from "./message.js"; +import type { IDecodedMessage, IProtoMessage } from "./message.js"; import type { ThisAndThat, ThisOrThat } from "./misc.js"; import { NetworkConfig } from "./sharding.js"; import type { StoreProtocolOptions } from "./store.js"; @@ -195,7 +195,13 @@ export type LightPushCoreResult = ThisOrThat< PeerId, "failure", LightPushFailure ->; +> & { + /** + * The proto object of the message. + * Present only if the message was successfully pushed to the network. + */ + message?: IProtoMessage; +}; export type FilterCoreResult = ThisOrThat< "success", @@ -209,7 +215,13 @@ export type LightPushSDKResult = ThisAndThat< PeerId[], "failures", LightPushFailure[] ->; +> & { + /** + * The proto objects of the messages. + * Present only if the messages were successfully pushed to the network. + */ + messages?: IProtoMessage[]; +}; export type FilterSDKResult = ThisAndThat< "successes", diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index da4fc5f003..8cfee2ebfc 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -20,6 +20,12 @@ export type ISendOptions = { * @default false */ useLegacy?: boolean; + + /** + * Amount of peers to send message to. + * Overrides `numPeersToUse` in {@link @waku/interfaces!CreateNodeOptions}. + */ + numPeersToUse?: number; }; export interface ISender { diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 71914755f7..960d9352d2 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js"; import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { + IDecodedMessage, + IDecoder, + IEncoder, + ISendMessage, + RequestId +} from "./message.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -58,9 +64,22 @@ export type IWakuEventEmitter = TypedEventEmitter; export interface IWaku { libp2p: Libp2p; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.send} and {@link IWaku.subscribe} instead + */ relay?: IRelay; + store?: IStore; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.subscribe} instead + */ filter?: IFilter; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.send} instead + */ lightPush?: ILightPush; /** @@ -251,6 +270,14 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; + /** + * Sends a message to the Waku network. + * + * @param {ISendMessage} message - The message to send. + * @returns {Promise} A promise that resolves to the request ID + */ + send(message: ISendMessage): Promise; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/sdk/package.json b/packages/sdk/package.json index a72b4025d7..691d426aee 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -75,7 +75,8 @@ "@waku/sds": "^0.0.7", "@waku/utils": "0.0.27", "libp2p": "2.8.11", - "lodash.debounce": "^4.0.8" + "lodash.debounce": "^4.0.8", + "uuid": "^10.0.0" }, "devDependencies": { "@libp2p/interface": "2.10.4", diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 669c77e38c..972fdcdc16 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -4,6 +4,7 @@ import { type IEncoder, ILightPush, type IMessage, + IProtoMessage, type ISendOptions, type Libp2p, LightPushCoreResult, @@ -82,10 +83,11 @@ export class LightPush implements ILightPush { log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); - const peerIds = await this.peerManager.getPeers({ + let peerIds = await this.peerManager.getPeers({ protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush, pubsubTopic: encoder.pubsubTopic }); + peerIds = peerIds?.slice(0, options.numPeersToUse); const coreResults = peerIds?.length > 0 @@ -93,12 +95,15 @@ export class LightPush implements ILightPush { peerIds.map((peerId) => this.protocol .send(encoder, message, peerId, options.useLegacy) - .catch((_e) => ({ - success: null, - failure: { - error: LightPushError.GENERIC_FAIL - } - })) + .catch( + (_e) => + ({ + success: null, + failure: { + error: LightPushError.GENERIC_FAIL + } + }) as LightPushCoreResult + ) ) ) : []; @@ -110,7 +115,10 @@ export class LightPush implements ILightPush { .map((v) => v.success) as PeerId[], failures: coreResults .filter((v) => v.failure) - .map((v) => v.failure) as LightPushFailure[] + .map((v) => v.failure) as LightPushFailure[], + messages: coreResults + .filter((v) => v.message) + .map((v) => v.message) as IProtoMessage[] } : { successes: [], diff --git a/packages/sdk/src/messaging/ack_manager.spec.ts b/packages/sdk/src/messaging/ack_manager.spec.ts new file mode 100644 index 0000000000..37cfcaefa2 --- /dev/null +++ b/packages/sdk/src/messaging/ack_manager.spec.ts @@ -0,0 +1,309 @@ +import type { + IDecodedMessage, + IFilter, + IStore, + NetworkConfig +} from "@waku/interfaces"; +import { expect } from "chai"; +import { afterEach, beforeEach, describe, it } from "mocha"; +import sinon from "sinon"; + +import { AckManager } from "./ack_manager.js"; +import { MessageStore } from "./message_store.js"; + +const mockMessage: IDecodedMessage = { + version: 1, + payload: new Uint8Array([1, 2, 3]), + contentTopic: "/test/1/topic/proto", + pubsubTopic: "test-pubsub", + timestamp: new Date(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hash: new Uint8Array([4, 5, 6]), + hashStr: "test-hash-123" +}; + +const mockNetworkConfig: NetworkConfig = { + clusterId: 1, + numShardsInCluster: 8 +}; + +describe("AckManager", () => { + let messageStore: MessageStore; + let mockFilter: IFilter; + let mockStore: IStore; + let ackManager: AckManager; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + messageStore = new MessageStore(); + + mockFilter = { + subscribe: sinon.stub().resolves(true), + unsubscribe: sinon.stub().resolves(true) + } as unknown as IFilter; + + mockStore = { + queryWithOrderedCallback: sinon.stub().resolves(undefined) + } as unknown as IStore; + + ackManager = new AckManager({ + messageStore, + filter: mockFilter, + store: mockStore, + networkConfig: mockNetworkConfig + }); + + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with provided parameters", () => { + expect(ackManager).to.be.instanceOf(AckManager); + }); + }); + + describe("start", () => { + it("should start filter and store ack managers", () => { + ackManager.start(); + + expect(clock.countTimers()).to.equal(1); + }); + + it("should be idempotent", () => { + ackManager.start(); + ackManager.start(); + + expect(clock.countTimers()).to.equal(1); + }); + }); + + describe("stop", () => { + it("should stop filter and store ack managers", async () => { + ackManager.start(); + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + + it("should clear subscribed content topics", async () => { + await ackManager.subscribe("/test/1/clear/proto"); + await ackManager.stop(); + + const result = await ackManager.subscribe("/test/1/clear/proto"); + expect(result).to.be.true; + }); + + it("should handle stop without start", async () => { + await ackManager.stop(); + }); + }); + + describe("subscribe", () => { + it("should subscribe to new content topic", async () => { + const result = await ackManager.subscribe("/test/1/new/proto"); + + expect(result).to.be.true; + expect( + (mockFilter.subscribe as sinon.SinonStub).calledWith( + sinon.match.object, + sinon.match.func + ) + ).to.be.true; + }); + + it("should return true for already subscribed topic", async () => { + await ackManager.subscribe("/test/1/existing/proto"); + const result = await ackManager.subscribe("/test/1/existing/proto"); + + expect(result).to.be.true; + expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true; + }); + + it("should return true if at least one subscription succeeds", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/topic/proto"); + + expect(result).to.be.true; + }); + + it("should return true when filter fails but store succeeds", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/topic/proto"); + + expect(result).to.be.true; + }); + }); + + describe("FilterAckManager", () => { + beforeEach(() => { + ackManager.start(); + }); + + it("should handle message reception and acknowledgment", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + const onMessageCallback = ( + mockFilter.subscribe as sinon.SinonStub + ).getCall(0).args[1]; + + await onMessageCallback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate messages", async () => { + messageStore.add(mockMessage, { filterAck: false }); + await ackManager.subscribe("/test/1/topic/proto"); + + const onMessageCallback = ( + mockFilter.subscribe as sinon.SinonStub + ).getCall(0).args[1]; + await onMessageCallback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should unsubscribe all decoders on stop", async () => { + await ackManager.subscribe("/test/1/topic1/proto"); + await ackManager.subscribe("/test/1/topic2/proto"); + + await ackManager.stop(); + + expect((mockFilter.unsubscribe as sinon.SinonStub).calledTwice).to.be + .true; + }); + }); + + describe("StoreAckManager", () => { + beforeEach(() => { + ackManager.start(); + }); + + it("should query store periodically", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + + await clock.tickAsync(5000); + + expect( + (mockStore.queryWithOrderedCallback as sinon.SinonStub).calledWith( + sinon.match.array, + sinon.match.func, + sinon.match.object + ) + ).to.be.true; + }); + + it("should handle store query callback", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + + await clock.tickAsync(5000); + + const callback = ( + mockStore.queryWithOrderedCallback as sinon.SinonStub + ).getCall(0).args[1]; + callback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate messages from store", async () => { + messageStore.add(mockMessage, { storeAck: false }); + + await ackManager.subscribe("/test/1/topic/proto"); + await clock.tickAsync(5000); + + const callback = ( + mockStore.queryWithOrderedCallback as sinon.SinonStub + ).getCall(0).args[1]; + callback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should stop interval on stop", async () => { + ackManager.start(); + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + }); + + describe("integration scenarios", () => { + it("should handle complete lifecycle", async () => { + ackManager.start(); + + const result1 = await ackManager.subscribe("/test/1/topic1/proto"); + const result2 = await ackManager.subscribe("/test/1/topic2/proto"); + + expect(result1).to.be.true; + expect(result2).to.be.true; + + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + + it("should handle multiple subscriptions to same topic", async () => { + ackManager.start(); + + const result1 = await ackManager.subscribe("/test/1/same/proto"); + const result2 = await ackManager.subscribe("/test/1/same/proto"); + + expect(result1).to.be.true; + expect(result2).to.be.true; + expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true; + }); + + it("should handle subscription after stop", async () => { + ackManager.start(); + await ackManager.stop(); + + const result = await ackManager.subscribe("/test/1/after-stop/proto"); + expect(result).to.be.true; + }); + }); + + describe("error handling", () => { + it("should handle filter subscription errors gracefully", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/error/proto"); + + expect(result).to.be.true; + }); + + it("should handle store query errors gracefully", async () => { + (mockStore.queryWithOrderedCallback as sinon.SinonStub).rejects( + new Error("Store query error") + ); + + ackManager.start(); + await ackManager.subscribe("/test/1/error/proto"); + + await clock.tickAsync(5000); + }); + + it("should handle unsubscribe errors gracefully", async () => { + ackManager.start(); + await ackManager.subscribe("/test/1/error/proto"); + + (mockFilter.unsubscribe as sinon.SinonStub).rejects( + new Error("Unsubscribe error") + ); + + try { + await ackManager.stop(); + } catch { + // Expected to throw + } + }); + }); +}); diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts new file mode 100644 index 0000000000..004f798cb1 --- /dev/null +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -0,0 +1,176 @@ +import { createDecoder } from "@waku/core"; +import { + IDecodedMessage, + IDecoder, + IFilter, + IStore, + NetworkConfig +} from "@waku/interfaces"; +import { createRoutingInfo } from "@waku/utils"; + +import { MessageStore } from "./message_store.js"; +import { IAckManager } from "./utils.js"; + +type AckManagerConstructorParams = { + messageStore: MessageStore; + filter: IFilter; + store: IStore; + networkConfig: NetworkConfig; +}; + +const DEFAULT_QUERY_INTERVAL = 5000; +const QUERY_TIME_WINDOW_MS = 60 * 60 * 1000; + +export class AckManager implements IAckManager { + private readonly messageStore: MessageStore; + private readonly filterAckManager: FilterAckManager; + private readonly storeAckManager: StoreAckManager; + private readonly networkConfig: NetworkConfig; + + private readonly subscribedContentTopics: Set = new Set(); + private readonly subscribingAttempts: Set = new Set(); + + public constructor(params: AckManagerConstructorParams) { + this.messageStore = params.messageStore; + this.networkConfig = params.networkConfig; + + this.filterAckManager = new FilterAckManager( + this.messageStore, + params.filter + ); + + this.storeAckManager = new StoreAckManager(this.messageStore, params.store); + } + + public start(): void { + this.filterAckManager.start(); + this.storeAckManager.start(); + } + + public async stop(): Promise { + await this.filterAckManager.stop(); + this.storeAckManager.stop(); + this.subscribedContentTopics.clear(); + } + + public async subscribe(contentTopic: string): Promise { + if ( + this.subscribedContentTopics.has(contentTopic) || + this.subscribingAttempts.has(contentTopic) + ) { + return true; + } + + this.subscribingAttempts.add(contentTopic); + + const decoder = createDecoder( + contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic + }) + ); + + const promises = await Promise.all([ + this.filterAckManager.subscribe(decoder), + this.storeAckManager.subscribe(decoder) + ]); + + this.subscribedContentTopics.add(contentTopic); + this.subscribingAttempts.delete(contentTopic); + return promises.some((success) => success); + } +} + +class FilterAckManager { + private decoders: Set> = new Set(); + + public constructor( + private messageStore: MessageStore, + private filter: IFilter + ) {} + + public start(): void { + return; + } + + public async stop(): Promise { + const promises = Array.from(this.decoders.entries()).map((decoder) => + this.filter.unsubscribe(decoder) + ); + await Promise.all(promises); + this.decoders.clear(); + } + + public async subscribe(decoder: IDecoder): Promise { + const success = await this.filter.subscribe( + decoder, + this.onMessage.bind(this) + ); + if (success) { + this.decoders.add(decoder); + } + return success; + } + + private async onMessage(message: IDecodedMessage): Promise { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message, { filterAck: true }); + } + + this.messageStore.markFilterAck(message.hashStr); + } +} + +class StoreAckManager { + private interval: ReturnType | null = null; + + private decoders: Set> = new Set(); + + public constructor( + private messageStore: MessageStore, + private store: IStore + ) {} + + public start(): void { + if (this.interval) { + return; + } + + this.interval = setInterval(() => { + void this.query(); + }, DEFAULT_QUERY_INTERVAL); + } + + public stop(): void { + if (!this.interval) { + return; + } + + clearInterval(this.interval); + this.interval = null; + } + + public async subscribe(decoder: IDecoder): Promise { + this.decoders.add(decoder); + return true; + } + + private async query(): Promise { + for (const decoder of this.decoders) { + await this.store.queryWithOrderedCallback( + [decoder], + (message) => { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message, { storeAck: true }); + } + + this.messageStore.markStoreAck(message.hashStr); + }, + { + timeStart: new Date(Date.now() - QUERY_TIME_WINDOW_MS), + timeEnd: new Date() + } + ); + } + } +} diff --git a/packages/sdk/src/messaging/index.ts b/packages/sdk/src/messaging/index.ts new file mode 100644 index 0000000000..0035e4eb2f --- /dev/null +++ b/packages/sdk/src/messaging/index.ts @@ -0,0 +1 @@ +export { Messaging } from "./messaging.js"; diff --git a/packages/sdk/src/messaging/message_store.spec.ts b/packages/sdk/src/messaging/message_store.spec.ts new file mode 100644 index 0000000000..ae5cf5f5c4 --- /dev/null +++ b/packages/sdk/src/messaging/message_store.spec.ts @@ -0,0 +1,349 @@ +import type { IDecodedMessage, ISendMessage } from "@waku/interfaces"; +import { expect } from "chai"; +import { beforeEach, describe, it } from "mocha"; + +import { MessageStore } from "./message_store.js"; + +describe("MessageStore", () => { + let messageStore: MessageStore; + let mockMessage: IDecodedMessage; + let mockSendMessage: ISendMessage; + + beforeEach(() => { + messageStore = new MessageStore(); + mockMessage = { + version: 1, + payload: new Uint8Array([1, 2, 3]), + contentTopic: "test-topic", + pubsubTopic: "test-pubsub", + timestamp: new Date(1000), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hash: new Uint8Array([4, 5, 6]), + hashStr: "test-hash-123" + }; + mockSendMessage = { + contentTopic: "test-topic", + payload: new Uint8Array([7, 8, 9]), + ephemeral: false + }; + }); + + describe("constructor", () => { + it("should create instance with default options", () => { + const store = new MessageStore(); + expect(store).to.be.instanceOf(MessageStore); + }); + + it("should create instance with custom resend interval", () => { + const customInterval = 10000; + const store = new MessageStore({ resendIntervalMs: customInterval }); + expect(store).to.be.instanceOf(MessageStore); + }); + }); + + describe("has", () => { + it("should return false for non-existent message", () => { + expect(messageStore.has("non-existent")).to.be.false; + }); + + it("should return true for added message", () => { + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should return true for pending message", async () => { + await messageStore.queue(mockSendMessage); + expect(messageStore.has("pending-hash")).to.be.false; + }); + }); + + describe("add", () => { + it("should add new message with default options", () => { + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should add message with custom options", () => { + messageStore.add(mockMessage, { filterAck: true, storeAck: false }); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate message", () => { + messageStore.add(mockMessage); + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add message if already exists", () => { + messageStore.add(mockMessage); + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("queue", () => { + it("should queue message and return request ID", async () => { + const requestId = await messageStore.queue(mockSendMessage); + expect(typeof requestId).to.equal("string"); + expect(requestId.length).to.be.greaterThan(0); + }); + + it("should queue multiple messages with different request IDs", async () => { + const requestId1 = await messageStore.queue(mockSendMessage); + const requestId2 = await messageStore.queue(mockSendMessage); + expect(requestId1).to.not.equal(requestId2); + }); + }); + + describe("markFilterAck", () => { + it("should mark filter acknowledgment for existing message", () => { + messageStore.add(mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle filter ack for non-existent message", () => { + expect(() => { + messageStore.markFilterAck("non-existent"); + }).to.not.throw(); + }); + + it("should handle filter ack for pending message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("markStoreAck", () => { + it("should mark store acknowledgment for existing message", () => { + messageStore.add(mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle store ack for non-existent message", () => { + expect(() => { + messageStore.markStoreAck("non-existent"); + }).to.not.throw(); + }); + + it("should handle store ack for pending message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("markSent", () => { + it("should mark message as sent with valid request ID", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle markSent with invalid request ID", () => { + expect(() => { + messageStore.markSent("invalid-request-id", mockMessage); + }).to.not.throw(); + }); + + it("should handle markSent with request ID without message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.messageRequest = undefined; + } + expect(() => { + messageStore.markSent(requestId, mockMessage); + }).to.not.throw(); + }); + + it("should set lastSentAt timestamp", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const sentMessage = { ...mockMessage, timestamp: new Date(2000) }; + messageStore.markSent(requestId, sentMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("getMessagesToSend", () => { + it("should return empty array when no messages queued", () => { + const messages = messageStore.getMessagesToSend(); + expect(messages).to.deep.equal([]); + }); + + it("should return queued messages that need sending", async () => { + const customStore = new MessageStore({ resendIntervalMs: 0 }); + const requestId = await customStore.queue(mockSendMessage); + const messages = customStore.getMessagesToSend(); + expect(messages).to.have.length(1); + expect(messages[0].requestId).to.equal(requestId); + expect(messages[0].message).to.equal(mockSendMessage); + }); + + it("should not return acknowledged messages", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.filterAck = true; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + + it("should not return store acknowledged messages", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.storeAck = true; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + + it("should respect resend interval", async () => { + const customStore = new MessageStore({ resendIntervalMs: 10000 }); + const requestId = await customStore.queue(mockSendMessage); + + const entry = (customStore as any).pendingRequests.get(requestId); + if (entry) { + entry.lastSentAt = Date.now() - 5000; + } + + const messagesAfterShortTime = customStore.getMessagesToSend(); + expect(messagesAfterShortTime).to.have.length(0); + + if (entry) { + entry.lastSentAt = Date.now() - 15000; + } + + const messagesAfterLongTime = customStore.getMessagesToSend(); + expect(messagesAfterLongTime).to.have.length(1); + }); + + it("should return messages after resend interval", async () => { + const customStore = new MessageStore({ resendIntervalMs: 1000 }); + const requestId = await customStore.queue(mockSendMessage); + + const entry = (customStore as any).pendingRequests.get(requestId); + if (entry) { + entry.lastSentAt = Date.now() - 2000; + } + + const messages = customStore.getMessagesToSend(); + expect(messages).to.have.length(1); + }); + + it("should not return messages without messageRequest", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.messageRequest = undefined; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + }); + + describe("edge cases", () => { + it("should handle multiple acknowledgments for same message", () => { + messageStore.add(mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle message received before sent", async () => { + messageStore.add(mockMessage); + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle empty message hash", () => { + const emptyHashMessage = { ...mockMessage, hashStr: "" }; + messageStore.add(emptyHashMessage); + expect(messageStore.has("")).to.be.true; + }); + + it("should handle very long message hash", () => { + const longHash = "a".repeat(1000); + const longHashMessage = { ...mockMessage, hashStr: longHash }; + messageStore.add(longHashMessage); + expect(messageStore.has(longHash)).to.be.true; + }); + + it("should handle special characters in hash", () => { + const specialHash = "test-hash-!@#$%^&*()_+-=[]{}|;':\",./<>?"; + const specialHashMessage = { ...mockMessage, hashStr: specialHash }; + messageStore.add(specialHashMessage); + expect(messageStore.has(specialHash)).to.be.true; + }); + }); + + describe("state transitions", () => { + it("should move message from pending to stored on ack", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + const pendingMessages = (messageStore as any).pendingMessages; + expect(pendingMessages.has(mockMessage.hashStr)).to.be.false; + }); + + it("should merge pending and stored message data", async () => { + messageStore.add(mockMessage, { filterAck: true }); + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should preserve acknowledgment state during transition", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.filterAck = true; + } + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("timing edge cases", () => { + it("should handle zero timestamp", async () => { + const zeroTimeMessage = { ...mockMessage, timestamp: new Date(0) }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, zeroTimeMessage); + }).to.not.throw(); + }); + + it("should handle future timestamp", async () => { + const futureTime = new Date(Date.now() + 86400000); + const futureMessage = { ...mockMessage, timestamp: futureTime }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, futureMessage); + }).to.not.throw(); + }); + + it("should handle very old timestamp", async () => { + const oldTime = new Date(0); + const oldMessage = { ...mockMessage, timestamp: oldTime }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, oldMessage); + }).to.not.throw(); + }); + }); +}); diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts new file mode 100644 index 0000000000..3d57482252 --- /dev/null +++ b/packages/sdk/src/messaging/message_store.ts @@ -0,0 +1,172 @@ +import { IDecodedMessage, ISendMessage, RequestId } from "@waku/interfaces"; +import { v4 as uuidv4 } from "uuid"; + +type QueuedMessage = { + messageRequest?: ISendMessage; + filterAck: boolean; + storeAck: boolean; + lastSentAt?: number; + createdAt: number; +}; + +type AddMessageOptions = { + filterAck?: boolean; + storeAck?: boolean; +}; + +type MessageStoreOptions = { + resendIntervalMs?: number; +}; + +type MessageHashStr = string; + +export class MessageStore { + private readonly messages: Map = new Map(); + + private readonly pendingRequests: Map = new Map(); + private readonly pendingMessages: Map = new Map(); + + private readonly resendIntervalMs: number; + + public constructor(options: MessageStoreOptions = {}) { + this.resendIntervalMs = options.resendIntervalMs ?? 5000; + } + + public has(hashStr: string): boolean { + return this.messages.has(hashStr) || this.pendingMessages.has(hashStr); + } + + public add(message: IDecodedMessage, options: AddMessageOptions = {}): void { + if (!this.has(message.hashStr)) { + this.messages.set(message.hashStr, { + filterAck: options.filterAck ?? false, + storeAck: options.storeAck ?? false, + createdAt: Date.now() + }); + } + } + + public markFilterAck(hashStr: string): void { + this.ackMessage(hashStr, { filterAck: true }); + this.replacePendingWithMessage(hashStr); + } + + public markStoreAck(hashStr: string): void { + this.ackMessage(hashStr, { storeAck: true }); + this.replacePendingWithMessage(hashStr); + } + + public markSent(requestId: RequestId, sentMessage: IDecodedMessage): void { + const entry = this.pendingRequests.get(requestId); + + if (!entry || !entry.messageRequest) { + return; + } + + entry.lastSentAt = Number(sentMessage.timestamp); + this.pendingMessages.set(sentMessage.hashStr, requestId); + + this.replacePendingWithMessage(sentMessage.hashStr); + } + + public async queue(message: ISendMessage): Promise { + const requestId = uuidv4(); // cspell:ignore uuidv4 + + this.pendingRequests.set(requestId.toString(), { + messageRequest: message, + filterAck: false, + storeAck: false, + createdAt: Date.now() + }); + + return requestId; + } + + public getMessagesToSend(): Array<{ + requestId: string; + message: ISendMessage; + }> { + const res: Array<{ + requestId: string; + message: ISendMessage; + }> = []; + + for (const [requestId, entry] of this.pendingRequests.entries()) { + const isAcknowledged = entry.filterAck || entry.storeAck; + + if (!entry.messageRequest || isAcknowledged) { + continue; + } + + const sentAt = entry.lastSentAt || entry.createdAt; + const notTooRecent = Date.now() - sentAt >= this.resendIntervalMs; + const notAcknowledged = !isAcknowledged; + + if (notTooRecent && notAcknowledged) { + res.push({ + requestId, + message: entry.messageRequest + }); + } + } + + return res; + } + + private ackMessage( + hashStr: MessageHashStr, + ackParams: AddMessageOptions = {} + ): void { + let entry = this.messages.get(hashStr); + + if (entry) { + entry.filterAck = ackParams.filterAck ?? entry.filterAck; + entry.storeAck = ackParams.storeAck ?? entry.storeAck; + return; + } + + const requestId = this.pendingMessages.get(hashStr); + + if (!requestId) { + return; + } + + entry = this.pendingRequests.get(requestId); + + if (!entry) { + return; + } + + entry.filterAck = ackParams.filterAck ?? entry.filterAck; + entry.storeAck = ackParams.storeAck ?? entry.storeAck; + } + + private replacePendingWithMessage(hashStr: MessageHashStr): void { + const requestId = this.pendingMessages.get(hashStr); + + if (!requestId) { + return; + } + + let entry = this.pendingRequests.get(requestId); + + if (!entry) { + return; + } + + // merge with message entry if possible + // this can happen if message we sent got received before we could add it to the message store + const messageEntry = this.messages.get(hashStr); + entry = { + ...entry, + ...messageEntry, + filterAck: messageEntry?.filterAck ?? entry.filterAck, + storeAck: messageEntry?.storeAck ?? entry.storeAck + }; + + this.pendingRequests.delete(requestId); + this.pendingMessages.delete(hashStr); + + this.messages.set(hashStr, entry); + } +} diff --git a/packages/sdk/src/messaging/messaging.spec.ts b/packages/sdk/src/messaging/messaging.spec.ts new file mode 100644 index 0000000000..667110a300 --- /dev/null +++ b/packages/sdk/src/messaging/messaging.spec.ts @@ -0,0 +1,87 @@ +import type { + IFilter, + ILightPush, + ISendMessage, + IStore +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { MessageStore } from "./message_store.js"; +import { Messaging } from "./messaging.js"; + +const testContentTopic = "/test/1/waku-messaging/utf8"; +const testNetworkconfig = { + clusterId: 0, + numShardsInCluster: 9 +}; + +describe("MessageStore", () => { + it("queues, marks sent and acks", async () => { + const store = new MessageStore({ resendIntervalMs: 1 }); + const msg: ISendMessage = { + contentTopic: testContentTopic, + payload: utf8ToBytes("hello") + }; + + const hash = await store.queue(msg); + expect(hash).to.be.a("string"); + if (!hash) return; + + const mockDecodedMessage = { + hashStr: hash, + timestamp: new Date() + } as any; + + store.markSent(hash, mockDecodedMessage); + store.markFilterAck(hash); + store.markStoreAck(hash); + + const toSend = store.getMessagesToSend(); + expect(toSend.length).to.eq(0); + }); +}); + +describe("Messaging", () => { + it("queues and sends via light push, marks sent", async () => { + const lightPush: ILightPush = { + multicodec: "lightpush", + start: () => {}, + stop: () => {}, + send: sinon.stub().resolves({ successes: [], failures: [] }) as any + } as unknown as ILightPush; + + const filter: IFilter = { + multicodec: "filter", + start: sinon.stub().resolves(), + stop: sinon.stub().resolves(), + subscribe: sinon.stub().resolves(true), + unsubscribe: sinon.stub().resolves(true), + unsubscribeAll: sinon.stub() + } as unknown as IFilter; + + const store: IStore = { + multicodec: "store", + createCursor: sinon.stub() as any, + queryGenerator: sinon.stub() as any, + queryWithOrderedCallback: sinon.stub().resolves(), + queryWithPromiseCallback: sinon.stub().resolves() + } as unknown as IStore; + + const messaging = new Messaging({ + lightPush, + filter, + store, + networkConfig: testNetworkconfig + }); + + const message: ISendMessage = { + contentTopic: testContentTopic, + payload: utf8ToBytes("hello") + }; + + await messaging.send(message); + expect((lightPush.send as any).calledOnce).to.be.true; + }); +}); diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts new file mode 100644 index 0000000000..28a17dbade --- /dev/null +++ b/packages/sdk/src/messaging/messaging.ts @@ -0,0 +1,61 @@ +import { + IFilter, + ILightPush, + ISendMessage, + IStore, + NetworkConfig, + RequestId +} from "@waku/interfaces"; + +import { AckManager } from "./ack_manager.js"; +import { MessageStore } from "./message_store.js"; +import { Sender } from "./sender.js"; + +interface IMessaging { + send(wakuLikeMessage: ISendMessage): Promise; +} + +type MessagingConstructorParams = { + lightPush: ILightPush; + filter: IFilter; + store: IStore; + networkConfig: NetworkConfig; +}; + +export class Messaging implements IMessaging { + private readonly messageStore: MessageStore; + private readonly ackManager: AckManager; + private readonly sender: Sender; + + public constructor(params: MessagingConstructorParams) { + this.messageStore = new MessageStore(); + + this.ackManager = new AckManager({ + messageStore: this.messageStore, + filter: params.filter, + store: params.store, + networkConfig: params.networkConfig + }); + + this.sender = new Sender({ + messageStore: this.messageStore, + lightPush: params.lightPush, + ackManager: this.ackManager, + networkConfig: params.networkConfig + }); + } + + public start(): void { + this.ackManager.start(); + this.sender.start(); + } + + public async stop(): Promise { + await this.ackManager.stop(); + this.sender.stop(); + } + + public send(wakuLikeMessage: ISendMessage): Promise { + return this.sender.send(wakuLikeMessage); + } +} diff --git a/packages/sdk/src/messaging/sender.spec.ts b/packages/sdk/src/messaging/sender.spec.ts new file mode 100644 index 0000000000..83306e95b4 --- /dev/null +++ b/packages/sdk/src/messaging/sender.spec.ts @@ -0,0 +1,144 @@ +import type { ILightPush, ISendMessage, NetworkConfig } from "@waku/interfaces"; +import { expect } from "chai"; +import { afterEach, beforeEach, describe, it } from "mocha"; +import sinon from "sinon"; + +import type { AckManager } from "./ack_manager.js"; +import type { MessageStore } from "./message_store.js"; +import { Sender } from "./sender.js"; + +describe("Sender", () => { + let sender: Sender; + let mockMessageStore: MessageStore; + let mockLightPush: ILightPush; + let mockAckManager: AckManager; + let mockNetworkConfig: NetworkConfig; + + beforeEach(() => { + mockMessageStore = { + queue: sinon.stub(), + getMessagesToSend: sinon.stub(), + markSent: sinon.stub() + } as any; + + mockLightPush = { + send: sinon.stub() + } as any; + + mockAckManager = { + subscribe: sinon.stub() + } as any; + + mockNetworkConfig = { + clusterId: 1, + shardId: 0 + } as any; + + sender = new Sender({ + messageStore: mockMessageStore, + lightPush: mockLightPush, + ackManager: mockAckManager, + networkConfig: mockNetworkConfig + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with provided parameters", () => { + expect(sender).to.be.instanceOf(Sender); + }); + }); + + describe("start", () => { + it("should set up background sending interval", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + + sender.start(); + + expect(setIntervalSpy.calledWith(sinon.match.func, 1000)).to.be.true; + }); + + it("should not create multiple intervals when called multiple times", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + + sender.start(); + sender.start(); + + expect(setIntervalSpy.calledOnce).to.be.true; + }); + }); + + describe("stop", () => { + it("should clear interval when called", () => { + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + sender.start(); + sender.stop(); + + expect(clearIntervalSpy.called).to.be.true; + }); + + it("should handle multiple stop calls gracefully", () => { + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + sender.start(); + sender.stop(); + sender.stop(); + + expect(clearIntervalSpy.calledOnce).to.be.true; + }); + + it("should handle stop without start", () => { + expect(() => sender.stop()).to.not.throw(); + }); + }); + + describe("send", () => { + const mockMessage: ISendMessage = { + contentTopic: "test-topic", + payload: new Uint8Array([1, 2, 3]), + ephemeral: false + }; + + const mockRequestId = "test-request-id"; + + it("should handle messageStore.queue failure", async () => { + const error = new Error("Queue failed"); + (mockMessageStore.queue as sinon.SinonStub).rejects(error); + + try { + await sender.send(mockMessage); + expect.fail("Expected error to be thrown"); + } catch (e: any) { + expect(e).to.equal(error); + } + }); + + it("should handle ackManager.subscribe failure", async () => { + const error = new Error("Subscribe failed"); + (mockAckManager.subscribe as sinon.SinonStub).rejects(error); + (mockMessageStore.queue as sinon.SinonStub).resolves(mockRequestId); + + try { + await sender.send(mockMessage); + expect.fail("Expected error to be thrown"); + } catch (e: any) { + expect(e).to.equal(error); + } + }); + }); + + describe("backgroundSend", () => { + it("should handle empty pending messages", async () => { + (mockMessageStore.getMessagesToSend as sinon.SinonStub).returns([]); + + await sender["backgroundSend"](); + + expect((mockMessageStore.getMessagesToSend as sinon.SinonStub).called).to + .be.true; + }); + }); +}); diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts new file mode 100644 index 0000000000..6d093d8bc2 --- /dev/null +++ b/packages/sdk/src/messaging/sender.ts @@ -0,0 +1,127 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { + ILightPush, + ISendMessage, + NetworkConfig, + RequestId +} from "@waku/interfaces"; +import { createRoutingInfo } from "@waku/utils"; + +import { AckManager } from "./ack_manager.js"; +import type { MessageStore } from "./message_store.js"; + +type SenderConstructorParams = { + messageStore: MessageStore; + lightPush: ILightPush; + ackManager: AckManager; + networkConfig: NetworkConfig; +}; + +const DEFAULT_SEND_INTERVAL = 1000; + +export class Sender { + private readonly messageStore: MessageStore; + private readonly lightPush: ILightPush; + private readonly ackManager: AckManager; + private readonly networkConfig: NetworkConfig; + + private readonly processingRequests: Set = new Set(); + + private sendInterval: ReturnType | null = null; + + public constructor(params: SenderConstructorParams) { + this.messageStore = params.messageStore; + this.lightPush = params.lightPush; + this.ackManager = params.ackManager; + this.networkConfig = params.networkConfig; + } + + public start(): void { + if (this.sendInterval) { + return; + } + + this.sendInterval = setInterval( + () => void this.backgroundSend(), + DEFAULT_SEND_INTERVAL + ); + } + + public stop(): void { + if (this.sendInterval) { + clearInterval(this.sendInterval); + this.sendInterval = null; + } + } + + public async send(message: ISendMessage): Promise { + const requestId = await this.messageStore.queue(message); + + await this.ackManager.subscribe(message.contentTopic); + await this.sendMessage(requestId, message); + + return requestId; + } + + private async backgroundSend(): Promise { + const pendingRequests = this.messageStore.getMessagesToSend(); + + for (const { requestId, message } of pendingRequests) { + await this.sendMessage(requestId, message); + } + } + + private async sendMessage( + requestId: RequestId, + message: ISendMessage + ): Promise { + try { + if (this.processingRequests.has(requestId)) { + return; + } + + this.processingRequests.add(requestId); + + const encoder = createEncoder({ + contentTopic: message.contentTopic, + routingInfo: createRoutingInfo(this.networkConfig, { + contentTopic: message.contentTopic + }), + ephemeral: message.ephemeral + }); + + const decoder = createDecoder( + message.contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic: message.contentTopic + }) + ); + + const response = await this.lightPush.send( + encoder, + { + payload: message.payload + }, + { + // force no retry as we have retry implemented in the sender + autoRetry: false, + // send to only one peer as we will retry on failure and need to ensure only one message is in the network + numPeersToUse: 1 + } + ); + + if (response?.messages && response.messages.length > 0) { + const decodedMessage = await decoder.fromProtoObj( + decoder.pubsubTopic, + response.messages[0] + ); + + this.messageStore.markSent(requestId, decodedMessage!); + } else { + // do nothing on failure, will retry + } + } finally { + this.processingRequests.delete(requestId); + } + } +} diff --git a/packages/sdk/src/messaging/utils.ts b/packages/sdk/src/messaging/utils.ts new file mode 100644 index 0000000000..3ce045e68c --- /dev/null +++ b/packages/sdk/src/messaging/utils.ts @@ -0,0 +1,5 @@ +export interface IAckManager { + start(): void; + stop(): void; + subscribe(contentTopic: string): Promise; +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..c4a4c27b42 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -26,13 +26,16 @@ import type { import { DefaultNetworkConfig, HealthStatus, - Protocols + ISendMessage, + Protocols, + RequestId } from "@waku/interfaces"; import { createRoutingInfo, Logger } from "@waku/utils"; import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; +import { Messaging } from "../messaging/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; @@ -64,6 +67,7 @@ export class WakuNode implements IWaku { private readonly connectionManager: ConnectionManager; private readonly peerManager: PeerManager; private readonly healthIndicator: HealthIndicator; + private messaging: Messaging | null = null; public constructor( options: CreateNodeOptions, @@ -126,6 +130,15 @@ export class WakuNode implements IWaku { }); } + if (this.lightPush && this.filter && this.store) { + this.messaging = new Messaging({ + lightPush: this.lightPush, + filter: this.filter, + store: this.store, + networkConfig: this.networkConfig + }); + } + log.info( "Waku node created", peerId, @@ -221,6 +234,7 @@ export class WakuNode implements IWaku { this.peerManager.start(); this.healthIndicator.start(); this.lightPush?.start(); + this.messaging?.start(); this._nodeStateLock = false; this._nodeStarted = true; @@ -231,6 +245,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; + await this.messaging?.stop(); this.lightPush?.stop(); await this.filter?.stop(); this.healthIndicator.stop(); @@ -282,6 +297,14 @@ export class WakuNode implements IWaku { }); } + public send(message: ISendMessage): Promise { + if (!this.messaging) { + throw new Error("Messaging not initialized"); + } + + return this.messaging.send(message); + } + private createRoutingInfo( contentTopic?: string, shardId?: number diff --git a/packages/utils/src/common/mock_node.ts b/packages/utils/src/common/mock_node.ts index 40472fea17..5d0dfd08c2 100644 --- a/packages/utils/src/common/mock_node.ts +++ b/packages/utils/src/common/mock_node.ts @@ -12,13 +12,15 @@ import { ILightPush, type IMessage, IRelay, + ISendMessage, ISendOptions, IStore, IWaku, IWakuEventEmitter, Libp2p, LightPushSDKResult, - Protocols + Protocols, + RequestId } from "@waku/interfaces"; export type MockWakuEvents = { @@ -154,6 +156,9 @@ export class MockWakuNode implements IWaku { public createEncoder(_params: CreateEncoderParams): IEncoder { throw new Error("Method not implemented."); } + public send(_message: ISendMessage): Promise { + throw new Error("Method not implemented."); + } public isStarted(): boolean { throw new Error("Method not implemented."); }