diff --git a/packages/bridge-controller/package.json b/packages/bridge-controller/package.json index 6438cb43c5e..ebec91af57f 100644 --- a/packages/bridge-controller/package.json +++ b/packages/bridge-controller/package.json @@ -60,7 +60,6 @@ "@metamask/multichain-network-controller": "^1.0.1", "@metamask/polling-controller": "^14.0.1", "@metamask/utils": "^11.8.1", - "@microsoft/fetch-event-source": "^2.0.1", "bignumber.js": "^9.1.2", "reselect": "^5.1.1", "uuid": "^8.3.2" diff --git a/packages/bridge-controller/src/utils/fetch-server-events.ts b/packages/bridge-controller/src/utils/fetch-server-events.ts new file mode 100644 index 00000000000..d0d079c7ba9 --- /dev/null +++ b/packages/bridge-controller/src/utils/fetch-server-events.ts @@ -0,0 +1,65 @@ +export type SSEMessage = { + event?: string; + data: string; +}; + +export type SSEOptions = RequestInit & { + onMessage: (data: unknown, event?: string) => void; + onError?: (err: Error) => void; +}; + +/** + * Streams server-sent events from the given URL + * + * @param url - The URL to stream events from + * @param options - The options for the SSE stream + */ +export const fetchServerEvents = async (url: string, options: SSEOptions) => { + const controller = new AbortController(); + const signal = options.signal ?? controller.signal; + + try { + const response = await fetch(url, { signal }); + if (!response.ok || !response.body) { + throw new Error(`${response.status}`); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + + // parse SSE messages on double newline + const parts = buffer.split('\n\n'); + buffer = parts.pop() || ''; + + for (const chunk of parts) { + const lines = chunk.split('\n'); + let eventName: string | undefined; + const dataLines: string[] = []; + + for (const line of lines) { + if (line.startsWith('event:')) { + eventName = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + dataLines.push(line.slice(5).trim()); + } + } + + if (dataLines.length > 0) { + const rawData = dataLines.join('\n'); + options.onMessage(JSON.parse(rawData), eventName); + } + } + } + } catch (error) { + options.onError?.(error); + } +}; diff --git a/packages/bridge-controller/src/utils/fetch.ts b/packages/bridge-controller/src/utils/fetch.ts index 7f25b73feb4..7016213249b 100644 --- a/packages/bridge-controller/src/utils/fetch.ts +++ b/packages/bridge-controller/src/utils/fetch.ts @@ -1,13 +1,13 @@ import { StructError } from '@metamask/superstruct'; import type { CaipAssetType, CaipChainId, Hex } from '@metamask/utils'; import type { EventSourceMessage } from '@microsoft/fetch-event-source'; -import { fetchEventSource } from '@microsoft/fetch-event-source'; import { isBitcoinChainId } from './bridge'; import { formatAddressToCaipReference, formatChainIdToDec, } from './caip-formatters'; +import { fetchServerEvents } from './fetch-server-events'; import type { FeatureId } from './validators'; import { validateQuoteResponse, @@ -337,14 +337,14 @@ export async function fetchBridgeQuoteStream( }; const urlStream = `${bridgeApiBaseUrl}/getQuoteStream?${queryParams}`; - await fetchEventSource(urlStream, { - headers: { - ...getClientHeaders(clientId, clientVersion), - 'Content-Type': 'text/event-stream', - }, + await fetchServerEvents(urlStream, { + // headers: { + // ...getClientHeaders(clientId, clientVersion), + // 'Content-Type': 'text/event-stream', + // }, signal, - onmessage: onMessage, - onerror: (e) => { + onMessage, + onError: (e) => { // Rethrow error to prevent silent fetch failures throw new Error(e.toString()); },