Skip to content

Commit 1eaf8a9

Browse files
authored
Merge pull request #19 from epferrari/will-idle-spec
Process unsubscribe requests transactionally in order
2 parents 1a1c2e0 + 673c6a1 commit 1eaf8a9

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

src/strongbus.ts

+33-3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ export class Bus<TEventMap extends object = object> implements Scannable<TEventM
6666
private readonly bus = new Map<EventKeys<TEventMap>|Events.WILDCARD, Set<EventHandlers.GenericHandler>>();
6767
private readonly lifecycle = new Map<Lifecycle, Set<EventHandlers.GenericHandler>>();
6868

69+
// Queue of unsubscription requests so that they are processed transactionally in order
70+
private readonly _unsubQueue: {
71+
token: string;
72+
event: EventKeys<TEventMap>|Events.WILDCARD;
73+
handler: EventHandlers.GenericHandler;
74+
}[] = [];
75+
private _purgingUnsubQueue: boolean = false;
76+
6977
constructor(options?: Options) {
7078
this.options = {
7179
...Bus.defaultOptions,
@@ -426,13 +434,35 @@ export class Bus<TEventMap extends object = object> implements Scannable<TEventM
426434
private cacheListener(event: EventKeys<TEventMap>|Events.WILDCARD, handler: EventHandlers.GenericHandler): Events.Subscription {
427435
const token = randomId();
428436
const sub = generateSubscription(() => {
437+
this._unsubQueue.push({
438+
token,
439+
event,
440+
handler
441+
});
442+
this.purgeUnsubQueue();
443+
});
444+
this.subscriptionCache.set(token, sub);
445+
return sub;
446+
}
447+
448+
private purgeUnsubQueue() {
449+
if(this._purgingUnsubQueue) {
450+
// There is another purge loop running.
451+
return;
452+
}
453+
454+
this._purgingUnsubQueue = true;
455+
456+
while(this._unsubQueue.length) {
457+
const {token, event, handler} = this._unsubQueue.shift();
429458
if(this.subscriptionCache.has(token)) {
430459
this.subscriptionCache.delete(token);
460+
// lifecycle events may trigger additional unsubs, which will be pushed to the end of queue and handled in a subsequent iteration of this loop
431461
this.removeListener(event, handler);
432462
}
433-
});
434-
this.subscriptionCache.set(token, sub);
435-
return sub;
463+
}
464+
465+
this._purgingUnsubQueue = false;
436466
}
437467

438468
private removeListener(event: EventKeys<TEventMap>|Events.WILDCARD, handler: EventHandlers.GenericHandler): void {

src/strongbus_spec.ts

+24
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ describe('Strongbus.Bus', () => {
431431
let onActive: jasmine.Spy;
432432
let onWillIdle: jasmine.Spy;
433433
let onIdle: jasmine.Spy;
434+
let onError: jasmine.Spy;
434435

435436
beforeEach(() => {
436437
bus.hook('willAddListener', onWillAddListener = jasmine.createSpy('onWillAddListener'));
@@ -441,6 +442,7 @@ describe('Strongbus.Bus', () => {
441442
bus.hook('active', onActive = jasmine.createSpy('onActive'));
442443
bus.hook('willIdle', onWillIdle = jasmine.createSpy('onWillIdle'));
443444
bus.hook('idle', onIdle = jasmine.createSpy('onIdle'));
445+
bus.hook('error', onError = jasmine.createSpy('error'));
444446
});
445447

446448
it('allows subscription to meta events', () => {
@@ -561,6 +563,18 @@ describe('Strongbus.Bus', () => {
561563
sub2();
562564
});
563565

566+
it('raises "error" when errors are thrown in the listener', () => {
567+
const error = new Error('Error in callback');
568+
bus.on('bar', () => {
569+
throw error;
570+
})
571+
bus.emit('bar', true);
572+
expect(onError).toHaveBeenCalledWith({
573+
error,
574+
event: 'bar'
575+
});
576+
});
577+
564578
describe('given bus has delegates', () => {
565579
let delegate: DelegateTestBus;
566580
let onDelegateWillAddListener: jasmine.Spy;
@@ -627,6 +641,16 @@ describe('Strongbus.Bus', () => {
627641
expect(onActive).toHaveBeenCalledTimes(1);
628642
});
629643

644+
it('handles unsubscribes fired from hooks', async () => {
645+
const sub1 = bus.on('foo', () => {});
646+
const sub2 = bus.on('foo', () => {});
647+
bus.hook('willRemoveListener', () => sub2());
648+
649+
sub1();
650+
expect(onWillIdle).toHaveBeenCalledTimes(1);
651+
expect(onIdle).toHaveBeenCalledTimes(1);
652+
});
653+
630654
it('raises "idle" events independently of delegates', () => {
631655
const foosub = bus.on('foo', onTestEvent);
632656
const fooSub2 = delegate.on('foo', onTestEvent);

0 commit comments

Comments
 (0)