1414
1515import protos from '../protos' ;
1616import { fetchWithTimeout } from '../base/http_utils' ;
17- import { assertExists } from '../base/logging' ;
17+ import { assertExists , reportError } from '../base/logging' ;
1818import { EngineBase } from '../trace_processor/engine' ;
1919
2020const RPC_CONNECT_TIMEOUT_MS = 2000 ;
@@ -32,6 +32,8 @@ export class HttpRpcEngine extends EngineBase {
3232 private websocket ?: WebSocket ;
3333 private connected = false ;
3434 private disposed = false ;
35+ private queue : Blob [ ] = [ ] ;
36+ private isProcessingQueue = false ;
3537
3638 // Can be changed by frontend/index.ts when passing ?rpc_port=1234 .
3739 static rpcPort = '9001' ;
@@ -86,11 +88,24 @@ export class HttpRpcEngine extends EngineBase {
8688 }
8789
8890 private onWebsocketMessage ( e : MessageEvent ) {
89- assertExists ( e . data as Blob )
90- . arrayBuffer ( )
91- . then ( ( buf ) => {
91+ const blob = assertExists ( e . data as Blob ) ;
92+ this . queue . push ( blob ) ;
93+ this . processQueue ( ) ;
94+ }
95+
96+ private async processQueue ( ) {
97+ if ( this . isProcessingQueue ) return ;
98+ this . isProcessingQueue = true ;
99+ while ( this . queue . length > 0 ) {
100+ try {
101+ const blob = assertExists ( this . queue . shift ( ) ) ;
102+ const buf = await blob . arrayBuffer ( ) ;
92103 super . onRpcResponseBytes ( new Uint8Array ( buf ) ) ;
93- } ) ;
104+ } catch ( e ) {
105+ reportError ( e ) ;
106+ }
107+ }
108+ this . isProcessingQueue = false ;
94109 }
95110
96111 static async checkConnection ( ) : Promise < HttpRpcState > {
0 commit comments