Skip to content

Commit 4fe8bfd

Browse files
committed
implement main ack manager, improve message store, implement Sender entity
1 parent 3de906a commit 4fe8bfd

File tree

6 files changed

+242
-151
lines changed

6 files changed

+242
-151
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import { IDecodedMessage, IFilter, IStore } from "@waku/interfaces";
2+
3+
import { MessageStore } from "./message_store.js";
4+
import { IAckManager, ICodec } from "./utils.js";
5+
6+
type AckManagerConstructorParams = {
7+
messageStore: MessageStore;
8+
filter: IFilter;
9+
store: IStore;
10+
};
11+
12+
export class AckManager implements IAckManager {
13+
private readonly messageStore: MessageStore;
14+
private readonly filterAckManager: FilterAckManager;
15+
private readonly storeAckManager: StoreAckManager;
16+
17+
public constructor(params: AckManagerConstructorParams) {
18+
this.messageStore = params.messageStore;
19+
20+
this.filterAckManager = new FilterAckManager(
21+
this.messageStore,
22+
params.filter
23+
);
24+
25+
this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
26+
}
27+
28+
public start(): void {
29+
this.filterAckManager.start();
30+
this.storeAckManager.start();
31+
}
32+
33+
public async stop(): Promise<void> {
34+
await this.filterAckManager.stop();
35+
this.storeAckManager.stop();
36+
}
37+
38+
public async subscribe(codec: ICodec): Promise<boolean> {
39+
return (
40+
(await this.filterAckManager.subscribe(codec)) ||
41+
(await this.storeAckManager.subscribe(codec))
42+
);
43+
}
44+
}
45+
46+
class FilterAckManager implements IAckManager {
47+
private codecs: Set<ICodec> = new Set();
48+
49+
public constructor(
50+
private messageStore: MessageStore,
51+
private filter: IFilter
52+
) {}
53+
54+
public start(): void {
55+
return;
56+
}
57+
58+
public async stop(): Promise<void> {
59+
const promises = Array.from(this.codecs.entries()).map((codec) =>
60+
this.filter.unsubscribe(codec)
61+
);
62+
await Promise.all(promises);
63+
this.codecs.clear();
64+
}
65+
66+
public async subscribe(codec: ICodec): Promise<boolean> {
67+
const success = await this.filter.subscribe(
68+
codec,
69+
this.onMessage.bind(this)
70+
);
71+
if (success) {
72+
this.codecs.add(codec);
73+
}
74+
return success;
75+
}
76+
77+
private async onMessage(message: IDecodedMessage): Promise<void> {
78+
if (!this.messageStore.has(message.hashStr)) {
79+
this.messageStore.add(message);
80+
}
81+
82+
this.messageStore.markFilterAck(message.hashStr);
83+
}
84+
}
85+
86+
class StoreAckManager implements IAckManager {
87+
private interval: ReturnType<typeof setInterval> | null = null;
88+
89+
private codecs: Set<ICodec> = new Set();
90+
91+
public constructor(
92+
private messageStore: MessageStore,
93+
private store: IStore
94+
) {}
95+
96+
public start(): void {
97+
if (this.interval) {
98+
return;
99+
}
100+
101+
this.interval = setInterval(() => {
102+
void this.query();
103+
}, 1000);
104+
}
105+
106+
public stop(): void {
107+
if (!this.interval) {
108+
return;
109+
}
110+
111+
clearInterval(this.interval);
112+
this.interval = null;
113+
}
114+
115+
public async subscribe(codec: ICodec): Promise<boolean> {
116+
this.codecs.add(codec);
117+
return true;
118+
}
119+
120+
private async query(): Promise<void> {
121+
for (const codec of this.codecs) {
122+
await this.store.queryWithOrderedCallback(
123+
[codec],
124+
(message) => {
125+
if (!this.messageStore.has(message.hashStr)) {
126+
this.messageStore.add(message);
127+
}
128+
129+
this.messageStore.markStoreAck(message.hashStr);
130+
},
131+
{
132+
timeStart: new Date(Date.now() - 60 * 60 * 1000),
133+
timeEnd: new Date()
134+
}
135+
);
136+
}
137+
}
138+
}

packages/sdk/src/messaging/fitler_ack.ts

Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { messageHashStr } from "@waku/core";
1+
import { message, messageHashStr } from "@waku/core";
22
import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces";
33

44
type QueuedMessage = {
@@ -14,8 +14,12 @@ type MessageStoreOptions = {
1414
resendIntervalMs?: number;
1515
};
1616

17+
type RequestId = string;
18+
1719
export class MessageStore {
1820
private readonly messages: Map<string, QueuedMessage> = new Map();
21+
private readonly pendingRequests: Map<RequestId, QueuedMessage> = new Map();
22+
1923
private readonly resendIntervalMs: number;
2024

2125
public constructor(options: MessageStoreOptions = {}) {
@@ -40,62 +44,91 @@ export class MessageStore {
4044
const entry = this.messages.get(hashStr);
4145
if (!entry) return;
4246
entry.filterAck = true;
47+
// TODO: implement events
4348
}
4449

4550
public markStoreAck(hashStr: string): void {
4651
const entry = this.messages.get(hashStr);
4752
if (!entry) return;
4853
entry.storeAck = true;
54+
// TODO: implement events
4955
}
5056

51-
public markSent(hashStr: string): void {
52-
const entry = this.messages.get(hashStr);
53-
if (!entry) return;
54-
entry.lastSentAt = Date.now();
57+
public async markSent(requestId: RequestId): Promise<void> {
58+
const entry = this.pendingRequests.get(requestId);
59+
60+
if (!entry || !entry.encoder || !entry.message) {
61+
return;
62+
}
63+
64+
try {
65+
entry.lastSentAt = Date.now();
66+
this.pendingRequests.delete(requestId);
67+
68+
const proto = await entry.encoder.toProtoObj(entry.message);
69+
70+
if (!proto) {
71+
return;
72+
}
73+
74+
const hashStr = messageHashStr(entry.encoder.pubsubTopic, proto);
75+
76+
this.messages.set(hashStr, entry);
77+
} catch (error) {
78+
// TODO: better recovery
79+
this.pendingRequests.set(requestId, entry);
80+
}
5581
}
5682

5783
public async queue(
5884
encoder: IEncoder,
5985
message: IMessage
60-
): Promise<string | undefined> {
61-
const proto = await encoder.toProtoObj(message);
62-
if (!proto) return undefined;
63-
const hashStr = messageHashStr(encoder.pubsubTopic, proto);
64-
const existing = this.messages.get(hashStr);
65-
if (!existing) {
66-
this.messages.set(hashStr, {
67-
encoder,
68-
message,
69-
filterAck: false,
70-
storeAck: false,
71-
createdAt: Date.now()
72-
});
73-
}
74-
return hashStr;
86+
): Promise<RequestId | undefined> {
87+
const requestId = crypto.randomUUID();
88+
89+
this.pendingRequests.set(requestId, {
90+
encoder,
91+
message,
92+
filterAck: false,
93+
storeAck: false,
94+
createdAt: Date.now()
95+
});
96+
97+
return requestId;
7598
}
7699

77100
public getMessagesToSend(): Array<{
78-
hashStr: string;
101+
requestId: string;
79102
encoder: IEncoder;
80103
message: IMessage;
81104
}> {
82105
const now = Date.now();
106+
83107
const res: Array<{
84-
hashStr: string;
108+
requestId: string;
85109
encoder: IEncoder;
86110
message: IMessage;
87111
}> = [];
88-
for (const [hashStr, entry] of this.messages.entries()) {
89-
if (!entry.encoder || !entry.message) continue;
90-
const isAcknowledged = entry.filterAck || entry.storeAck;
91-
if (isAcknowledged) continue;
112+
113+
for (const [requestId, entry] of this.pendingRequests.entries()) {
114+
if (!entry.encoder || !entry.message) {
115+
continue;
116+
}
117+
118+
const isAcknowledged = entry.filterAck || entry.storeAck; // TODO: make sure it works with message and pending requests and returns messages to re-sent that are not ack yet
119+
120+
if (isAcknowledged) {
121+
continue;
122+
}
123+
92124
if (
93125
!entry.lastSentAt ||
94126
now - entry.lastSentAt >= this.resendIntervalMs
95127
) {
96-
res.push({ hashStr, encoder: entry.encoder, message: entry.message });
128+
res.push({ requestId, encoder: entry.encoder, message: entry.message });
97129
}
98130
}
131+
99132
return res;
100133
}
101134
}

packages/sdk/src/messaging/messaging.ts

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import {
66
IStore
77
} from "@waku/interfaces";
88

9-
import { FilterAckManager } from "./fitler_ack.js";
9+
import { AckManager } from "./ack_manager.js";
1010
import { MessageStore } from "./message_store.js";
11-
import { StoreAckManager } from "./store_ack.js";
11+
import { Sender } from "./sender.js";
1212

1313
interface IMessaging {
1414
send(encoder: IEncoder, message: IMessage): Promise<void>;
@@ -21,38 +21,34 @@ type MessagingConstructorParams = {
2121
};
2222

2323
export class Messaging implements IMessaging {
24-
private readonly lightPush: ILightPush;
2524
private readonly messageStore: MessageStore;
26-
private readonly filterAckManager: FilterAckManager;
27-
private readonly storeAckManager: StoreAckManager;
25+
private readonly ackManager: AckManager;
26+
private readonly sender: Sender;
2827

2928
public constructor(params: MessagingConstructorParams) {
30-
this.lightPush = params.lightPush;
3129
this.messageStore = new MessageStore();
32-
this.filterAckManager = new FilterAckManager(
33-
this.messageStore,
34-
params.filter
35-
);
36-
this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
30+
31+
this.ackManager = new AckManager({
32+
messageStore: this.messageStore,
33+
filter: params.filter,
34+
store: params.store
35+
});
36+
37+
this.sender = new Sender({
38+
messageStore: this.messageStore,
39+
lightPush: params.lightPush
40+
});
3741
}
3842

3943
public start(): void {
40-
this.filterAckManager.start();
41-
this.storeAckManager.start();
44+
this.ackManager.start();
4245
}
4346

4447
public async stop(): Promise<void> {
45-
await this.filterAckManager.stop();
46-
this.storeAckManager.stop();
48+
await this.ackManager.stop();
4749
}
4850

4951
public send(encoder: IEncoder, message: IMessage): Promise<void> {
50-
return (async () => {
51-
const hash = await this.messageStore.queue(encoder, message);
52-
await this.lightPush.send(encoder, message);
53-
if (hash) {
54-
this.messageStore.markSent(hash);
55-
}
56-
})();
52+
return this.sender.send(encoder, message);
5753
}
5854
}

0 commit comments

Comments
 (0)