@@ -495,17 +495,17 @@ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
495495 * This class provides helpers for Postgres' LISTEN/NOTIFY pub/sub
496496 * implementation. We aggregate all LISTEN/NOTIFY events so that we can supply
497497 * them all via a single pgClient. We grab and release this client from/to the
498- * pool automatically. If the Postgres connection is interrupted then we'll
499- * automatically reconnect and re-establish the LISTENs, however _events can be
500- * lost_ when this happens, so you should be careful that Postgres connections
501- * will not be prematurely terminated in general .
498+ * pool automatically. If the Postgres connection is interrupted then we will
499+ * NOT automatically reconnect and re-establish the LISTENs because we want to
500+ * ensure "at least once" delivery and clients should be informed if messages
501+ * may be missed .
502502 */
503503export class PgSubscriber <
504504 TTopics extends { [ key : string ] : string } = { [ key : string ] : string } ,
505505> implements GrafastSubscriber < TTopics >
506506{
507507 private topics : { [ topic in keyof TTopics ] ?: AsyncIterableIterator < any > [ ] } =
508- { } ;
508+ Object . create ( null ) ;
509509 private eventEmitter = new EventEmitter ( ) ;
510510 private alive = true ;
511511
@@ -653,10 +653,18 @@ export class PgSubscriber<
653653 ) ;
654654 }
655655 if ( Object . keys ( this . topics ) . length > 0 ) {
656- // Trigger a new client to be fetched and have it sync.
657- this . getClient ( ) . then ( null , ( ) => {
658- // Must be released; ignore
656+ // Terminate all subscriptions, to ensure at-least-once delivery
657+ const e = new Error (
658+ `Underlying pubsub channel interrupted, terminating connection due to risk of missing messages.` ,
659+ ) ;
660+ Object . values ( this . topics ) . forEach ( ( iterators ) => {
661+ if ( iterators ) {
662+ for ( const iterator of iterators ) {
663+ iterator . throw ! ( e ) ;
664+ }
665+ }
659666 } ) ;
667+ this . topics = Object . create ( null ) ;
660668 }
661669 }
662670 } ) ;
0 commit comments