@@ -11,16 +11,46 @@ export class Sender {
1111 private readonly messageStore : MessageStore ;
1212 private readonly lightPush : ILightPush ;
1313
14+ private sendInterval : ReturnType < typeof setInterval > | null = null ;
15+
1416 public constructor ( params : SenderConstructorParams ) {
1517 this . messageStore = params . messageStore ;
1618 this . lightPush = params . lightPush ;
1719 }
1820
19- public async send ( encoder : IEncoder , message : IMessage ) : Promise < void > {
21+ public start ( ) : void {
22+ this . sendInterval = setInterval ( ( ) => void this . backgroundSend ( ) , 1000 ) ;
23+ }
24+
25+ public stop ( ) : void {
26+ if ( this . sendInterval ) {
27+ clearInterval ( this . sendInterval ) ;
28+ this . sendInterval = null ;
29+ }
30+ }
31+
32+ public async send ( encoder : IEncoder , message : IMessage ) : Promise < string > {
2033 const requestId = await this . messageStore . queue ( encoder , message ) ;
21- await this . lightPush . send ( encoder , message ) ;
22- if ( requestId ) {
34+ const response = await this . lightPush . send ( encoder , message ) ;
35+
36+ if ( response . successes . length > 0 ) {
2337 await this . messageStore . markSent ( requestId ) ;
2438 }
39+
40+ return requestId ;
41+ }
42+
43+ private async backgroundSend ( ) : Promise < void > {
44+ const pendingRequests = this . messageStore . getMessagesToSend ( ) ;
45+
46+ // todo: implement chunking, error handling, retry, etc.
47+ // todo: implement backoff and batching potentially
48+ for ( const { requestId, encoder, message } of pendingRequests ) {
49+ const response = await this . lightPush . send ( encoder , message ) ;
50+
51+ if ( response . successes . length > 0 ) {
52+ await this . messageStore . markSent ( requestId ) ;
53+ }
54+ }
2555 }
2656}
0 commit comments