Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/bridge-controller/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
"@metamask/multichain-network-controller": "^1.0.1",
"@metamask/polling-controller": "^14.0.1",
"@metamask/utils": "^11.8.1",
"@microsoft/fetch-event-source": "^2.0.1",
"bignumber.js": "^9.1.2",
"reselect": "^5.1.1",
"uuid": "^8.3.2"
Expand Down
65 changes: 65 additions & 0 deletions packages/bridge-controller/src/utils/fetch-server-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
export type SSEMessage = {
event?: string;
data: string;
};

export type SSEOptions = RequestInit & {
onMessage: (data: unknown, event?: string) => void;
onError?: (err: Error) => void;
};

/**
* Streams server-sent events from the given URL
*
* @param url - The URL to stream events from
* @param options - The options for the SSE stream
*/
export const fetchServerEvents = async (url: string, options: SSEOptions) => {
const controller = new AbortController();
const signal = options.signal ?? controller.signal;

try {
const response = await fetch(url, { signal });
if (!response.ok || !response.body) {
throw new Error(`${response.status}`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';

while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}

buffer += decoder.decode(value, { stream: true });

// parse SSE messages on double newline
const parts = buffer.split('\n\n');
buffer = parts.pop() || '';

for (const chunk of parts) {
const lines = chunk.split('\n');
let eventName: string | undefined;
const dataLines: string[] = [];

for (const line of lines) {
if (line.startsWith('event:')) {
eventName = line.slice(6).trim();
} else if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}

if (dataLines.length > 0) {
const rawData = dataLines.join('\n');
options.onMessage(JSON.parse(rawData), eventName);
}
}
}
} catch (error) {
options.onError?.(error);
}
};
16 changes: 8 additions & 8 deletions packages/bridge-controller/src/utils/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { StructError } from '@metamask/superstruct';
import type { CaipAssetType, CaipChainId, Hex } from '@metamask/utils';
import type { EventSourceMessage } from '@microsoft/fetch-event-source';
import { fetchEventSource } from '@microsoft/fetch-event-source';

import { isBitcoinChainId } from './bridge';
import {
formatAddressToCaipReference,
formatChainIdToDec,
} from './caip-formatters';
import { fetchServerEvents } from './fetch-server-events';
import type { FeatureId } from './validators';
import {
validateQuoteResponse,
Expand Down Expand Up @@ -337,14 +337,14 @@ export async function fetchBridgeQuoteStream(
};

const urlStream = `${bridgeApiBaseUrl}/getQuoteStream?${queryParams}`;
await fetchEventSource(urlStream, {
headers: {
...getClientHeaders(clientId, clientVersion),
'Content-Type': 'text/event-stream',
},
await fetchServerEvents(urlStream, {
// headers: {
// ...getClientHeaders(clientId, clientVersion),
// 'Content-Type': 'text/event-stream',
// },
signal,
onmessage: onMessage,
onerror: (e) => {
onMessage,
onError: (e) => {
// Rethrow error to prevent silent fetch failures
throw new Error(e.toString());
},
Expand Down
Loading