Skip to content

Commit

Permalink
Implementing unsubscription queue for transactional removes
Browse files Browse the repository at this point in the history
  • Loading branch information
Kurt Preston committed Jun 23, 2021
1 parent 584d4a6 commit 967296a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
34 changes: 31 additions & 3 deletions src/strongbus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ export class Bus<TEventMap extends object = object> implements Scannable<TEventM
private readonly bus = new Map<EventKeys<TEventMap>|Events.WILDCARD, Set<EventHandlers.GenericHandler>>();
private readonly lifecycle = new Map<Lifecycle, Set<EventHandlers.GenericHandler>>();

// Queue of unsubscription requests so that they are processed transactionally in oprder
private readonly _unsubQueue: {
token: string;
event: EventKeys<TEventMap>|Events.WILDCARD;
handler: EventHandlers.GenericHandler;
}[] = [];
private _purgingUnsubQueue: boolean = false;

constructor(options?: Options) {
this.options = {
...Bus.defaultOptions,
Expand Down Expand Up @@ -426,13 +434,33 @@ export class Bus<TEventMap extends object = object> implements Scannable<TEventM
private cacheListener(event: EventKeys<TEventMap>|Events.WILDCARD, handler: EventHandlers.GenericHandler): Events.Subscription {
const token = randomId();
const sub = generateSubscription(() => {
this._unsubQueue.push({
token,
event,
handler
});
this.purgeUnsubQueue();
});
this.subscriptionCache.set(token, sub);
return sub;
}

private purgeUnsubQueue() {
if(this._purgingUnsubQueue) {
return;
} else {
this._purgingUnsubQueue = true;
}

while(this._unsubQueue.length) {
const {token, event, handler} = this._unsubQueue.shift();
if(this.subscriptionCache.has(token)) {
this.subscriptionCache.delete(token);
this.removeListener(event, handler);
}
});
this.subscriptionCache.set(token, sub);
return sub;
}

this._purgingUnsubQueue = false;
}

private removeListener(event: EventKeys<TEventMap>|Events.WILDCARD, handler: EventHandlers.GenericHandler): void {
Expand Down
2 changes: 1 addition & 1 deletion src/strongbus_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ describe('Strongbus.Bus', () => {
expect(onActive).toHaveBeenCalledTimes(1);
});

fit('handles unsubscribes fired from hooks', async () => {
it('handles unsubscribes fired from hooks', async () => {
const sub1 = bus.on('foo', () => {});
const sub2 = bus.on('foo', () => {});
bus.hook('willRemoveListener', () => sub2());
Expand Down

0 comments on commit 967296a

Please sign in to comment.