@@ -12,7 +12,7 @@ import {
1212/**
1313 * Configuration options for the `StatelessClientTransport`.
1414 */
15- export type StatelessClientTransportOptions = {
15+ export type HttpClientTransportOptions = {
1616 /**
1717 * An OAuth client provider to use for authentication.
1818 */
@@ -28,7 +28,7 @@ export type StatelessClientTransportOptions = {
2828 * Client transport for Stateless HTTP: this will communicate with the server using HTTP requests
2929 * and handle both immediate responses and streaming responses when needed.
3030 */
31- export class StatelessClientTransport implements Transport {
31+ export class HttpClientTransport implements Transport {
3232 private _abortController ?: AbortController ;
3333 private _eventSource ?: EventSource ;
3434 private _authProvider ?: OAuthClientProvider ;
@@ -40,7 +40,7 @@ export class StatelessClientTransport implements Transport {
4040
4141 constructor (
4242 private _url : URL ,
43- opts ?: StatelessClientTransportOptions ,
43+ opts ?: HttpClientTransportOptions ,
4444 ) {
4545 this . _authProvider = opts ?. authProvider ;
4646 this . _requestInit = opts ?. requestInit ;
@@ -68,35 +68,71 @@ export class StatelessClientTransport implements Transport {
6868
6969 try {
7070 while ( true ) {
71- const { done, value } = await reader . read ( ) ;
72- if ( done ) break ;
71+ try {
72+ const { done, value } = await reader . read ( ) ;
73+ if ( done ) break ;
7374
74- buffer += decoder . decode ( value , { stream : true } ) ;
75-
76- // Process complete SSE messages
77- const messages = buffer . split ( "\n\n" ) ;
78- buffer = messages . pop ( ) || "" ; // Keep incomplete message in buffer
75+ // Handle potential decoding errors
76+ try {
77+ buffer += decoder . decode ( value , { stream : true } ) ;
78+ } catch ( decodeError ) {
79+ this . onerror ?.(
80+ new Error ( `Failed to decode stream data: ${ decodeError } ` ) ,
81+ ) ;
82+ continue ;
83+ }
7984
80- for ( const message of messages ) {
81- if ( ! message . trim ( ) ) continue ;
85+ // Process complete SSE messages
86+ const messages = buffer . split ( "\n\n" ) ;
87+ buffer = messages . pop ( ) || "" ; // Keep incomplete message in buffer
8288
83- const lines = message . split ( "\n" ) ;
84- const data = lines . find ( ( line ) => line . startsWith ( "data: " ) ) ?. slice (
85- 6 ,
86- ) ;
89+ for ( const message of messages ) {
90+ if ( ! message . trim ( ) ) continue ;
8791
88- if ( data ) {
8992 try {
90- const parsed = JSONRPCMessageSchema . parse ( JSON . parse ( data ) ) ;
91- this . onmessage ?.( parsed ) ;
92- } catch ( error ) {
93- this . onerror ?.( error as Error ) ;
93+ const lines = message . split ( "\n" ) ;
94+ const data = lines . find ( ( line ) => line . startsWith ( "data: " ) )
95+ ?. slice ( 6 ) ;
96+
97+ if ( data ) {
98+ try {
99+ const parsed = JSONRPCMessageSchema . parse ( JSON . parse ( data ) ) ;
100+ this . onmessage ?.( parsed ) ;
101+ } catch ( parseError ) {
102+ this . onerror ?.(
103+ new Error ( `Failed to parse message data: ${ parseError } ` ) ,
104+ ) ;
105+ }
106+ }
107+ } catch ( messageError ) {
108+ this . onerror ?.(
109+ new Error ( `Failed to process message: ${ messageError } ` ) ,
110+ ) ;
111+ // Continue processing other messages
112+ continue ;
94113 }
95114 }
115+ } catch ( readError ) {
116+ // Handle stream read errors
117+ if ( readError instanceof Error && readError . name === "AbortError" ) {
118+ // Expected abort, break the loop
119+ break ;
120+ }
121+ this . onerror ?.(
122+ new Error ( `Error reading from stream: ${ String ( readError ) } ` ) ,
123+ ) ;
124+ // Try to continue reading if possible
125+ continue ;
96126 }
97127 }
128+ } catch ( error ) {
129+ this . onerror ?.( new Error ( `Fatal streaming error: ${ error } ` ) ) ;
98130 } finally {
99- reader . releaseLock ( ) ;
131+ try {
132+ reader . releaseLock ( ) ;
133+ } catch ( error ) {
134+ console . warn ( "Failed to release reader lock:" , error ) ;
135+ }
100136 }
101137 }
102138
@@ -119,13 +155,36 @@ export class StatelessClientTransport implements Transport {
119155 }
120156
121157 close ( ) : Promise < void > {
122- this . _abortController ?. abort ( ) ;
123- this . _eventSource ?. close ( ) ;
124- this . onclose ?.( ) ;
158+ try {
159+ if ( this . _abortController ) {
160+ try {
161+ this . _abortController . abort ( ) ;
162+ } catch ( error ) {
163+ console . warn ( "Failed to abort controller:" , error ) ;
164+ }
165+ this . _abortController = undefined ;
166+ }
167+
168+ if ( this . _eventSource ) {
169+ try {
170+ this . _eventSource . close ( ) ;
171+ } catch ( error ) {
172+ console . warn ( "Failed to close EventSource:" , error ) ;
173+ }
174+ this . _eventSource = undefined ;
175+ }
176+
177+ this . onclose ?.( ) ;
178+ } catch ( error ) {
179+ this . onerror ?.( new Error ( `Error during close: ${ error } ` ) ) ;
180+ }
125181 return Promise . resolve ( ) ;
126182 }
127183
128184 async send ( message : JSONRPCMessage ) : Promise < void > {
185+ // Create a new abort controller for this request
186+ this . _abortController = new AbortController ( ) ;
187+
129188 try {
130189 const commonHeaders = await this . _commonHeaders ( ) ;
131190 const headers = new Headers ( {
@@ -139,7 +198,7 @@ export class StatelessClientTransport implements Transport {
139198 method : "POST" ,
140199 headers,
141200 body : JSON . stringify ( message ) ,
142- signal : this . _abortController ? .signal ,
201+ signal : this . _abortController . signal ,
143202 } ;
144203
145204 const response = await fetch ( this . _url , init ) ;
@@ -150,25 +209,37 @@ export class StatelessClientTransport implements Transport {
150209 return ;
151210 }
152211
153- const text = await response . text ( ) . catch ( ( ) => null ) ;
212+ const text = await response . text ( ) . catch ( ( error ) =>
213+ `Failed to read error response: ${ error } `
214+ ) ;
154215 throw new Error (
155216 `Error POSTing to endpoint (HTTP ${ response . status } ): ${ text } ` ,
156217 ) ;
157218 }
158219
159220 // Handle streaming responses
160- if ( response . headers . get ( "content-type" ) ?. includes ( "text/event-stream" ) ) {
161- this . _handleStreamingResponse ( response ) ;
221+ const contentType = response . headers . get ( "content-type" ) ;
222+ if ( contentType ?. includes ( "text/event-stream" ) ) {
223+ await this . _handleStreamingResponse ( response ) ;
162224 return ;
163225 }
164226
165227 // Handle immediate JSON responses
166- const responseData = await response . json ( ) ;
167- const responseMessage = JSONRPCMessageSchema . parse ( responseData ) ;
168- this . onmessage ?.( responseMessage ) ;
228+ try {
229+ const responseData = await response . json ( ) ;
230+ const responseMessage = JSONRPCMessageSchema . parse ( responseData ) ;
231+ this . onmessage ?.( responseMessage ) ;
232+ } catch ( error ) {
233+ throw new Error ( `Failed to parse response: ${ error } ` ) ;
234+ }
169235 } catch ( error ) {
170236 this . onerror ?.( error as Error ) ;
171237 throw error ;
238+ } finally {
239+ // Clean up the abort controller if it wasn't used
240+ if ( this . _abortController ) {
241+ this . _abortController = undefined ;
242+ }
172243 }
173244 }
174245
0 commit comments