Skip to content

Commit 9f841c2

Browse files
committed
Add stateless version of mcp
Signed-off-by: Marcos Candeia <[email protected]>
1 parent 470d7d8 commit 9f841c2

File tree

5 files changed

+332
-19
lines changed

5 files changed

+332
-19
lines changed

deno.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@deco/mcp",
3-
"version": "0.2.0",
3+
"version": "0.2.1",
44
"exports": "./mod.ts",
55
"tasks": {
66
"check": "deno fmt && deno lint && deno check mod.ts"

mcp/server.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
ListToolsRequestSchema,
1313
} from "@modelcontextprotocol/sdk/types.js";
1414
import { SSEServerTransport } from "./sse.ts";
15+
import { StatelessServerTransport } from "./stateless.ts";
1516
import { dereferenceSchema } from "./utils.ts";
1617
import { WebSocketServerTransport } from "./websocket.ts";
1718

@@ -244,51 +245,52 @@ export function mcpServer<TManifest extends AppManifest>(
244245

245246
return async (c: Context, next: Next) => {
246247
const path = new URL(c.req.url).pathname;
248+
const basePath = options?.basePath ?? "";
247249

250+
// Handle WebSocket upgrade if requested
248251
if (
249-
path === `${options?.basePath ?? ""}/mcp/ws` &&
252+
path === `${basePath}/mcp/ws` &&
250253
c.req.raw.headers.get("upgrade") === "websocket"
251254
) {
252255
const { response, socket } = Deno.upgradeWebSocket(c.req.raw);
253-
254256
const transport = new WebSocketServerTransport();
255-
256257
transport.acceptWebSocket(socket);
257258
mcp.server.connect(transport);
258-
259259
return response;
260260
}
261261

262-
if (path === `${options?.basePath ?? ""}/mcp/sse`) {
262+
// Legacy SSE endpoint for backwards compatibility
263+
if (path === `${basePath}/mcp/sse`) {
263264
const transport = new SSEServerTransport(
264-
`${options?.basePath ?? ""}${MESSAGES_ENDPOINT}`,
265+
`${basePath}${MESSAGES_ENDPOINT}`,
265266
);
266267
transports.set(transport.sessionId, transport);
267-
268268
transport.onclose = () => {
269269
transports.delete(transport.sessionId);
270270
};
271-
272271
const response = transport.createSSEResponse();
273272
mcp.server.connect(transport);
274-
275273
return response;
276274
}
277275

276+
// Main message endpoint - handles both stateless requests and SSE upgrades
278277
if (path === `${options?.basePath ?? ""}${MESSAGES_ENDPOINT}`) {
279278
const sessionId = c.req.query("sessionId");
280-
if (!sessionId) {
281-
return c.json({ error: "Missing sessionId" }, 400);
282-
}
279+
if (sessionId) {
280+
const transport = transports.get(sessionId);
281+
if (!transport) {
282+
return c.json({ error: "Invalid session" }, 404);
283+
}
283284

284-
const transport = transports.get(sessionId);
285-
if (!transport) {
286-
return c.json({ error: "Invalid session" }, 404);
285+
return await transport.handlePostMessage(c.req.raw);
287286
}
288-
289-
return await transport.handlePostMessage(c.req.raw);
287+
// For stateless transport
288+
const transport = new StatelessServerTransport();
289+
mcp.server.connect(transport);
290+
const response = await transport.handleMessage(c.req.raw);
291+
transport.close(); // Close the transport after handling the message
292+
return response;
290293
}
291-
292294
await next();
293295
};
294296
}

mcp/stateless-client.ts

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import {
2+
type JSONRPCMessage,
3+
JSONRPCMessageSchema,
4+
} from "@modelcontextprotocol/sdk/types.js";
5+
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
6+
7+
import {
8+
auth,
9+
type OAuthClientProvider,
10+
UnauthorizedError,
11+
} from "@modelcontextprotocol/sdk/client/auth.js";
12+
/**
13+
* Configuration options for the `StatelessClientTransport`.
14+
*/
15+
export type StatelessClientTransportOptions = {
16+
/**
17+
* An OAuth client provider to use for authentication.
18+
*/
19+
authProvider?: OAuthClientProvider;
20+
21+
/**
22+
* Customizes requests to the server.
23+
*/
24+
requestInit?: RequestInit;
25+
};
26+
27+
/**
28+
* Client transport for Stateless HTTP: this will communicate with the server using HTTP requests
29+
* and handle both immediate responses and streaming responses when needed.
30+
*/
31+
export class StatelessClientTransport implements Transport {
32+
private _abortController?: AbortController;
33+
private _eventSource?: EventSource;
34+
private _authProvider?: OAuthClientProvider;
35+
private _requestInit?: RequestInit;
36+
37+
onclose?: () => void;
38+
onerror?: (error: Error) => void;
39+
onmessage?: (message: JSONRPCMessage) => void;
40+
41+
constructor(
42+
private _url: URL,
43+
opts?: StatelessClientTransportOptions,
44+
) {
45+
this._authProvider = opts?.authProvider;
46+
this._requestInit = opts?.requestInit;
47+
}
48+
49+
private async _commonHeaders(): Promise<HeadersInit> {
50+
const headers: HeadersInit = {};
51+
if (this._authProvider) {
52+
const tokens = await this._authProvider.tokens();
53+
if (tokens) {
54+
headers["Authorization"] = `Bearer ${tokens.access_token}`;
55+
}
56+
}
57+
return headers;
58+
}
59+
60+
private async _handleStreamingResponse(response: Response) {
61+
const reader = response.body?.getReader();
62+
if (!reader) {
63+
throw new Error("No response body");
64+
}
65+
66+
const decoder = new TextDecoder();
67+
let buffer = "";
68+
69+
try {
70+
while (true) {
71+
const { done, value } = await reader.read();
72+
if (done) break;
73+
74+
buffer += decoder.decode(value, { stream: true });
75+
76+
// Process complete SSE messages
77+
const messages = buffer.split("\n\n");
78+
buffer = messages.pop() || ""; // Keep incomplete message in buffer
79+
80+
for (const message of messages) {
81+
if (!message.trim()) continue;
82+
83+
const lines = message.split("\n");
84+
const data = lines.find((line) => line.startsWith("data: "))?.slice(
85+
6,
86+
);
87+
88+
if (data) {
89+
try {
90+
const parsed = JSONRPCMessageSchema.parse(JSON.parse(data));
91+
this.onmessage?.(parsed);
92+
} catch (error) {
93+
this.onerror?.(error as Error);
94+
}
95+
}
96+
}
97+
}
98+
} finally {
99+
reader.releaseLock();
100+
}
101+
}
102+
103+
private async _authThenSend(message: JSONRPCMessage): Promise<void> {
104+
if (!this._authProvider) {
105+
throw new UnauthorizedError("No auth provider");
106+
}
107+
108+
const result = await auth(this._authProvider, { serverUrl: this._url });
109+
if (result !== "AUTHORIZED") {
110+
throw new UnauthorizedError();
111+
}
112+
113+
await this.send(message);
114+
}
115+
116+
start(): Promise<void> {
117+
// No persistent connection needed
118+
return Promise.resolve();
119+
}
120+
121+
close(): Promise<void> {
122+
this._abortController?.abort();
123+
this._eventSource?.close();
124+
this.onclose?.();
125+
return Promise.resolve();
126+
}
127+
128+
async send(message: JSONRPCMessage): Promise<void> {
129+
try {
130+
const commonHeaders = await this._commonHeaders();
131+
const headers = new Headers({
132+
...commonHeaders,
133+
...this._requestInit?.headers,
134+
});
135+
headers.set("content-type", "application/json");
136+
137+
const init = {
138+
...this._requestInit,
139+
method: "POST",
140+
headers,
141+
body: JSON.stringify(message),
142+
signal: this._abortController?.signal,
143+
};
144+
145+
const response = await fetch(this._url, init);
146+
147+
if (!response.ok) {
148+
if (response.status === 401 && this._authProvider) {
149+
await this._authThenSend(message);
150+
return;
151+
}
152+
153+
const text = await response.text().catch(() => null);
154+
throw new Error(
155+
`Error POSTing to endpoint (HTTP ${response.status}): ${text}`,
156+
);
157+
}
158+
159+
// Handle streaming responses
160+
if (response.headers.get("content-type")?.includes("text/event-stream")) {
161+
await this._handleStreamingResponse(response);
162+
return;
163+
}
164+
165+
// Handle immediate JSON responses
166+
const responseData = await response.json();
167+
const responseMessage = JSONRPCMessageSchema.parse(responseData);
168+
this.onmessage?.(responseMessage);
169+
} catch (error) {
170+
this.onerror?.(error as Error);
171+
throw error;
172+
}
173+
}
174+
175+
/**
176+
* Call this method after the user has finished authorizing via their user agent and is redirected back to the MCP client application.
177+
*/
178+
async finishAuth(authorizationCode: string): Promise<void> {
179+
if (!this._authProvider) {
180+
throw new UnauthorizedError("No auth provider");
181+
}
182+
183+
const result = await auth(this._authProvider, {
184+
serverUrl: this._url,
185+
authorizationCode,
186+
});
187+
if (result !== "AUTHORIZED") {
188+
throw new UnauthorizedError("Failed to authorize");
189+
}
190+
}
191+
}

mcp/stateless.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
2+
import {
3+
type JSONRPCMessage,
4+
JSONRPCMessageSchema,
5+
} from "@modelcontextprotocol/sdk/types.js";
6+
import { ServerSentEventStream } from "@std/http";
7+
8+
/**
9+
* Server transport for Stateless HTTP: this will handle messages over plain HTTP requests
10+
* with optional SSE upgrade for streaming responses.
11+
*/
12+
export class StatelessServerTransport implements Transport {
13+
private _controller?: ReadableStreamDefaultController;
14+
private _responseResolver?: (response: Response) => void;
15+
16+
onclose?: () => void;
17+
onerror?: (error: Error) => void;
18+
onmessage?: (message: JSONRPCMessage) => void;
19+
20+
/**
21+
* Creates an SSE response for streaming
22+
*/
23+
private createStreamingResponse(): Response {
24+
const stream = new ReadableStream({
25+
start: (controller) => {
26+
this._controller = controller;
27+
},
28+
cancel: () => {
29+
this._controller = undefined;
30+
this.onclose?.();
31+
},
32+
}).pipeThrough(new ServerSentEventStream());
33+
34+
return new Response(stream, {
35+
headers: {
36+
"Content-Type": "text/event-stream",
37+
"Cache-Control": "no-cache",
38+
"Connection": "keep-alive",
39+
"Access-Control-Allow-Origin": "*",
40+
},
41+
});
42+
}
43+
44+
/**
45+
* Handles incoming HTTP messages
46+
*/
47+
async handleMessage(request: Request): Promise<Response> {
48+
try {
49+
const contentTypeHeader = request.headers.get("content-type");
50+
if (!contentTypeHeader?.includes("application/json")) {
51+
throw new Error("Unsupported content-type: Expected application/json");
52+
}
53+
54+
const body = await request.json();
55+
const message = JSONRPCMessageSchema.parse(body);
56+
57+
// Handle the message
58+
this.onmessage?.(message);
59+
60+
// Create a promise that will be resolved with the response
61+
const { promise, resolve } = Promise.withResolvers<Response>();
62+
this._responseResolver = resolve;
63+
64+
// For requests that need streaming responses, immediately return SSE stream
65+
if (this.shouldUpgradeToStreaming(message)) {
66+
resolve(this.createStreamingResponse());
67+
}
68+
69+
return promise;
70+
} catch (error) {
71+
this.onerror?.(error as Error);
72+
return new Response(String(error), { status: 400 });
73+
}
74+
}
75+
76+
/**
77+
* Determines if a request should be upgraded to streaming based on message type
78+
*/
79+
private shouldUpgradeToStreaming(message: JSONRPCMessage): boolean {
80+
console.log({ message });
81+
// Implement logic to determine if streaming is needed
82+
// For example, based on method name or parameters
83+
return false; // Default to non-streaming
84+
}
85+
86+
start(): Promise<void> {
87+
return Promise.resolve();
88+
}
89+
90+
close(): Promise<void> {
91+
this._controller?.close();
92+
this._controller = undefined;
93+
this.onclose?.();
94+
return Promise.resolve();
95+
}
96+
97+
send(message: JSONRPCMessage): Promise<void> {
98+
// If we have a controller, this is a streaming response
99+
if (this._controller) {
100+
this._controller.enqueue({
101+
event: "message",
102+
data: JSON.stringify(message),
103+
id: Date.now().toString(),
104+
});
105+
} else if (this._responseResolver) {
106+
// For non-streaming responses, resolve with a single JSON response
107+
this._responseResolver(
108+
new Response(JSON.stringify(message), {
109+
headers: {
110+
"Content-Type": "application/json",
111+
},
112+
}),
113+
);
114+
this._responseResolver = undefined;
115+
}
116+
117+
return Promise.resolve();
118+
}
119+
}

0 commit comments

Comments
 (0)