Skip to content

Commit 84a6ea6

Browse files
fix: cleanup routines on reliable channel and core protocols (#2733)
* fix: add stop methods to protocols to prevent event listener leaks * fix: add abort signal support for graceful store query cancellation * fix: call protocol stop methods in WakuNode.stop() * fix: improve QueryOnConnect cleanup and abort signal handling * fix: improve MissingMessageRetriever cleanup with abort signal * fix: add stopAllRetries method to RetryManager for proper cleanup * fix: implement comprehensive ReliableChannel stop() with proper cleanup * fix: add active query tracking to QueryOnConnect and await its stop() * fix: add stop() to IRelayAPI and IStore interfaces, implement in SDK wrappers * align with usual naming (isStarted) * remove unnecessary `await` * test: `stop()` is now async * chore: use more concise syntax --------- Co-authored-by: Levente Kiss <[email protected]>
1 parent 049e564 commit 84a6ea6

File tree

15 files changed

+313
-67
lines changed

15 files changed

+313
-67
lines changed

packages/core/src/lib/filter/filter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export class FilterCore {
6161
}
6262

6363
public async stop(): Promise<void> {
64+
this.streamManager.stop();
6465
try {
6566
await this.libp2p.unhandle(FilterCodecs.PUSH);
6667
} catch (e) {

packages/core/src/lib/light_push/light_push.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ export class LightPushCore {
3333
this.streamManager = new StreamManager(CODECS.v3, libp2p.components);
3434
}
3535

36+
public stop(): void {
37+
this.streamManager.stop();
38+
this.streamManagerV2.stop();
39+
}
40+
3641
public async send(
3742
encoder: IEncoder,
3843
message: IMessage,

packages/core/src/lib/store/store.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ export class StoreCore {
3535
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
3636
}
3737

38+
public stop(): void {
39+
this.streamManager.stop();
40+
}
41+
3842
public get maxTimeLimit(): number {
3943
return MAX_TIME_RANGE;
4044
}
@@ -68,6 +72,11 @@ export class StoreCore {
6872

6973
let currentCursor = queryOpts.paginationCursor;
7074
while (true) {
75+
if (queryOpts.abortSignal?.aborted) {
76+
log.info("Store query aborted by signal");
77+
break;
78+
}
79+
7180
const storeQueryRequest = StoreQueryRequest.create({
7281
...queryOpts,
7382
paginationCursor: currentCursor
@@ -89,13 +98,22 @@ export class StoreCore {
8998
break;
9099
}
91100

92-
const res = await pipe(
93-
[storeQueryRequest.encode()],
94-
lp.encode,
95-
stream,
96-
lp.decode,
97-
async (source) => await all(source)
98-
);
101+
let res;
102+
try {
103+
res = await pipe(
104+
[storeQueryRequest.encode()],
105+
lp.encode,
106+
stream,
107+
lp.decode,
108+
async (source) => await all(source)
109+
);
110+
} catch (error) {
111+
if (error instanceof Error && error.name === "AbortError") {
112+
log.info(`Store query aborted for peer ${peerId.toString()}`);
113+
break;
114+
}
115+
throw error;
116+
}
99117

100118
const bytes = new Uint8ArrayList();
101119
res.forEach((chunk) => {
@@ -122,6 +140,11 @@ export class StoreCore {
122140
`${storeQueryResponse.messages.length} messages retrieved from store`
123141
);
124142

143+
if (queryOpts.abortSignal?.aborted) {
144+
log.info("Store query aborted by signal before processing messages");
145+
break;
146+
}
147+
125148
const decodedMessages = storeQueryResponse.messages.map((protoMsg) => {
126149
if (!protoMsg.message) {
127150
return Promise.resolve(undefined);

packages/core/src/lib/stream_manager/stream_manager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ export class StreamManager {
2323
);
2424
}
2525

26+
public stop(): void {
27+
this.libp2p.events.removeEventListener(
28+
"peer:update",
29+
this.handlePeerUpdateStreamPool
30+
);
31+
this.streamPool.clear();
32+
this.ongoingCreation.clear();
33+
}
34+
2635
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
2736
try {
2837
const peerIdStr = peerId.toString();

packages/interfaces/src/relay.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export interface IRelayAPI {
1616
readonly pubsubTopics: Set<PubsubTopic>;
1717
readonly gossipSub: GossipSub;
1818
start: () => Promise<void>;
19+
stop: () => Promise<void>;
1920
waitForPeers: () => Promise<void>;
2021
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
2122
}

packages/interfaces/src/store.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,18 @@ export type QueryRequestParams = {
8888
* Only use if you know what you are doing.
8989
*/
9090
peerId?: PeerId;
91+
92+
/**
93+
* An optional AbortSignal to cancel the query.
94+
* When the signal is aborted, the query will stop processing and return early.
95+
*/
96+
abortSignal?: AbortSignal;
9197
};
9298

9399
export type IStore = {
94100
readonly multicodec: string;
95101

102+
stop(): void;
96103
createCursor(message: IDecodedMessage): StoreCursor;
97104
queryGenerator: <T extends IDecodedMessage>(
98105
decoders: IDecoder<T>[],

packages/relay/src/relay.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ export class Relay implements IRelay {
6767
* Observers under key `""` are always called.
6868
*/
6969
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
70+
private messageEventHandlers: Map<
71+
PubsubTopic,
72+
(event: CustomEvent<GossipsubMessage>) => void
73+
> = new Map();
7074

7175
public constructor(params: RelayConstructorParams) {
7276
if (!this.isRelayPubsub(params.libp2p.services.pubsub)) {
@@ -105,6 +109,19 @@ export class Relay implements IRelay {
105109
this.subscribeToAllTopics();
106110
}
107111

112+
public async stop(): Promise<void> {
113+
for (const pubsubTopic of this.pubsubTopics) {
114+
const handler = this.messageEventHandlers.get(pubsubTopic);
115+
if (handler) {
116+
this.gossipSub.removeEventListener("gossipsub:message", handler);
117+
}
118+
this.gossipSub.topicValidators.delete(pubsubTopic);
119+
this.gossipSub.unsubscribe(pubsubTopic);
120+
}
121+
this.messageEventHandlers.clear();
122+
this.observers.clear();
123+
}
124+
108125
/**
109126
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
110127
* mesh for all pubsubTopics.
@@ -299,17 +316,17 @@ export class Relay implements IRelay {
299316
* @override
300317
*/
301318
private gossipSubSubscribe(pubsubTopic: string): void {
302-
this.gossipSub.addEventListener(
303-
"gossipsub:message",
304-
(event: CustomEvent<GossipsubMessage>) => {
305-
if (event.detail.msg.topic !== pubsubTopic) return;
306-
307-
this.processIncomingMessage(
308-
event.detail.msg.topic,
309-
event.detail.msg.data
310-
).catch((e) => log.error("Failed to process incoming message", e));
311-
}
312-
);
319+
const handler = (event: CustomEvent<GossipsubMessage>): void => {
320+
if (event.detail.msg.topic !== pubsubTopic) return;
321+
322+
this.processIncomingMessage(
323+
event.detail.msg.topic,
324+
event.detail.msg.data
325+
).catch((e) => log.error("Failed to process incoming message", e));
326+
};
327+
328+
this.messageEventHandlers.set(pubsubTopic, handler);
329+
this.gossipSub.addEventListener("gossipsub:message", handler);
313330

314331
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
315332
this.gossipSub.subscribe(pubsubTopic);

packages/sdk/src/light_push/light_push.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export class LightPush implements ILightPush {
6565

6666
public stop(): void {
6767
this.retryManager.stop();
68+
this.protocol.stop();
6869
}
6970

7071
public async send(

packages/sdk/src/query_on_connect/query_on_connect.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,14 @@ describe("QueryOnConnect", () => {
158158
expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true;
159159
});
160160

161-
it("should remove event listeners when stopped", () => {
161+
it("should remove event listeners when stopped", async () => {
162162
const peerRemoveSpy =
163163
mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy;
164164
const wakuRemoveSpy =
165165
mockWakuEventEmitter.removeEventListener as sinon.SinonSpy;
166166

167167
queryOnConnect.start();
168-
queryOnConnect.stop();
168+
await queryOnConnect.stop();
169169

170170
expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be
171171
.true;

packages/sdk/src/query_on_connect/query_on_connect.ts

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ export class QueryOnConnect<
5252
private lastTimeOffline: number;
5353
private readonly forceQueryThresholdMs: number;
5454

55+
private isStarted: boolean = false;
56+
private abortController?: AbortController;
57+
private activeQueryPromise?: Promise<void>;
58+
59+
private boundStoreConnectHandler?: (event: CustomEvent<PeerId>) => void;
60+
private boundHealthHandler?: (event: CustomEvent<HealthStatus>) => void;
61+
5562
public constructor(
5663
public decoders: IDecoder<T>[],
5764
public stopIfTrue: (msg: T) => boolean,
@@ -71,11 +78,37 @@ export class QueryOnConnect<
7178
}
7279

7380
public start(): void {
81+
if (this.isStarted) {
82+
log.warn("QueryOnConnect already running");
83+
return;
84+
}
7485
log.info("starting query-on-connect service");
86+
this.isStarted = true;
87+
this.abortController = new AbortController();
7588
this.setupEventListeners();
7689
}
7790

78-
public stop(): void {
91+
public async stop(): Promise<void> {
92+
if (!this.isStarted) {
93+
return;
94+
}
95+
log.info("stopping query-on-connect service");
96+
this.isStarted = false;
97+
98+
if (this.abortController) {
99+
this.abortController.abort();
100+
this.abortController = undefined;
101+
}
102+
103+
if (this.activeQueryPromise) {
104+
log.info("Waiting for active query to complete...");
105+
try {
106+
await this.activeQueryPromise;
107+
} catch (error) {
108+
log.warn("Active query failed during stop:", error);
109+
}
110+
}
111+
79112
this.unsetEventListeners();
80113
}
81114

@@ -107,7 +140,10 @@ export class QueryOnConnect<
107140
this.lastTimeOffline > this.lastSuccessfulQuery ||
108141
timeSinceLastQuery > this.forceQueryThresholdMs
109142
) {
110-
await this.query(peerId);
143+
this.activeQueryPromise = this.query(peerId).finally(() => {
144+
this.activeQueryPromise = undefined;
145+
});
146+
await this.activeQueryPromise;
111147
} else {
112148
log.info(`no querying`);
113149
}
@@ -120,7 +156,8 @@ export class QueryOnConnect<
120156
for await (const page of this._queryGenerator(this.decoders, {
121157
timeStart,
122158
timeEnd,
123-
peerId
159+
peerId,
160+
abortSignal: this.abortController?.signal
124161
})) {
125162
// Await for decoding
126163
const messages = (await Promise.all(page)).filter(
@@ -166,33 +203,41 @@ export class QueryOnConnect<
166203
}
167204

168205
private setupEventListeners(): void {
206+
this.boundStoreConnectHandler = (event: CustomEvent<PeerId>) => {
207+
void this.maybeQuery(event.detail).catch((err) =>
208+
log.error("query-on-connect error", err)
209+
);
210+
};
211+
212+
this.boundHealthHandler = this.updateLastOfflineDate.bind(this);
213+
169214
this.peerManagerEventEmitter.addEventListener(
170215
PeerManagerEventNames.StoreConnect,
171-
(event) =>
172-
void this.maybeQuery(event.detail).catch((err) =>
173-
log.error("query-on-connect error", err)
174-
)
216+
this.boundStoreConnectHandler
175217
);
176218

177219
this.wakuEventEmitter.addEventListener(
178220
WakuEvent.Health,
179-
this.updateLastOfflineDate.bind(this)
221+
this.boundHealthHandler
180222
);
181223
}
182224

183225
private unsetEventListeners(): void {
184-
this.peerManagerEventEmitter.removeEventListener(
185-
PeerManagerEventNames.StoreConnect,
186-
(event) =>
187-
void this.maybeQuery(event.detail).catch((err) =>
188-
log.error("query-on-connect error", err)
189-
)
190-
);
226+
if (this.boundStoreConnectHandler) {
227+
this.peerManagerEventEmitter.removeEventListener(
228+
PeerManagerEventNames.StoreConnect,
229+
this.boundStoreConnectHandler
230+
);
231+
this.boundStoreConnectHandler = undefined;
232+
}
191233

192-
this.wakuEventEmitter.removeEventListener(
193-
WakuEvent.Health,
194-
this.updateLastOfflineDate.bind(this)
195-
);
234+
if (this.boundHealthHandler) {
235+
this.wakuEventEmitter.removeEventListener(
236+
WakuEvent.Health,
237+
this.boundHealthHandler
238+
);
239+
this.boundHealthHandler = undefined;
240+
}
196241
}
197242

198243
private updateLastOfflineDate(event: CustomEvent<HealthStatus>): void {

0 commit comments

Comments
 (0)