From 0b67a22d6499e5eb36a76383a0d38269abeadca7 Mon Sep 17 00:00:00 2001 From: Vinson Chuong Date: Sun, 29 Oct 2023 18:32:26 -0700 Subject: [PATCH] feat(web-streams): Support sending and receiving Web Stream bodies --- http/parse-body.js | 6 ++++ http/send-request/index.test.js | 54 ++++++++++++++++++++++++++++ http/start-server/build-responder.js | 4 +++ http/start-server/index.test.js | 49 ++++++++++++++++++------- 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/http/parse-body.js b/http/parse-body.js index 74b96c9..86038e3 100644 --- a/http/parse-body.js +++ b/http/parse-body.js @@ -1,3 +1,4 @@ +import {Readable} from 'node:stream' import typeIs from 'type-is' import getStream from 'get-stream' @@ -14,6 +15,11 @@ export function parseHttp1Body(requestOrResponse) { return '' } + if (typeIs(requestOrResponse, ['text/event-stream'])) { + requestOrResponse.setEncoding('utf8') + return Readable.toWeb(requestOrResponse) + } + if (typeIs(requestOrResponse, textMediaTypes)) { return getStream(requestOrResponse) } diff --git a/http/send-request/index.test.js b/http/send-request/index.test.js index 9062d9b..9d5a6df 100644 --- a/http/send-request/index.test.js +++ b/http/send-request/index.test.js @@ -1,7 +1,10 @@ import {Buffer} from 'node:buffer' +import {ReadableStream} from 'node:stream/web' import * as https from 'node:https' +import * as http from 'node:http' import test from 'ava' import makeCert from 'make-cert' +import getStream from 'get-stream' import sendRequest from './index.js' test('sending a simple GET request', async (t) => { @@ -74,3 +77,54 @@ test('parsing a binary body into a Buffer', async (t) => { t.true(response.body instanceof Buffer) }) + +test('supporting streaming server-sent events', async (t) => { + const server = new http.Server() + server.on('request', (request, response) => { + response.writeHead(200, { + 'Content-Type': 'text/event-stream', + }) + response.end( + [ + ': comment\n', + '\n', + 'data: some text\n', + '\n', + 'data: multiple\n', + 'data: lines\n', + '\n', + 'event: foo\n', + 'data: bar\n', + '\n', + ].join(''), + ) + }) + + await new Promise((resolve) => { + server.listen(12_000) + server.once('listening', resolve) + }) + + const response = await sendRequest({ + method: 'GET', + url: 'http://localhost:12000', + headers: {}, + }) + + t.true(response.body instanceof ReadableStream) + t.is( + await getStream(response.body), + [ + ': comment\n', + '\n', + 'data: some text\n', + '\n', + 'data: multiple\n', + 'data: lines\n', + '\n', + 'event: foo\n', + 'data: bar\n', + '\n', + ].join(''), + ) +}) diff --git a/http/start-server/build-responder.js b/http/start-server/build-responder.js index 4883f50..d491769 100644 --- a/http/start-server/build-responder.js +++ b/http/start-server/build-responder.js @@ -1,3 +1,5 @@ +import {ReadableStream} from 'node:stream/web' +import {Readable} from 'node:stream' import {Buffer} from 'node:buffer' import * as http2 from 'node:http2' import omit from 'lodash/omit.js' @@ -36,6 +38,8 @@ export default function (computeResponse) { response.body instanceof Buffer ) { nodeResponse.end(response.body) + } else if (response.body instanceof ReadableStream) { + Readable.fromWeb(response.body).pipe(nodeResponse) } else { response.body.pipe(nodeResponse) } diff --git a/http/start-server/index.test.js b/http/start-server/index.test.js index c9b2fd4..e4bcb70 100644 --- a/http/start-server/index.test.js +++ b/http/start-server/index.test.js @@ -1,5 +1,6 @@ import {Buffer} from 'node:buffer' import {createHash} from 'node:crypto' +import {Readable} from 'node:stream' import test from 'ava' import WebSocket from 'ws' import makeCert from 'make-cert' @@ -116,13 +117,13 @@ test('starting an HTTP server', async (t) => { await session.close() }) -test('supporting a stream body', async (t) => { +test('supporting a web stream body', async (t) => { const server = await startServer({port: 10_004}, () => ({ status: 200, headers: { 'content-type': 'text/plain', }, - body: intoStream('Hello World!'), + body: Readable.toWeb(intoStream('Hello World!')), })) t.teardown(async () => { stopServer(server) @@ -140,13 +141,13 @@ test('supporting a stream body', async (t) => { ) }) -test('supporting a buffer body', async (t) => { +test('supporting a stream body', async (t) => { const server = await startServer({port: 10_005}, () => ({ status: 200, headers: { 'content-type': 'text/plain', }, - body: Buffer.from('Hello World!'), + body: intoStream('Hello World!'), })) t.teardown(async () => { stopServer(server) @@ -164,13 +165,37 @@ test('supporting a buffer body', async (t) => { ) }) +test('supporting a buffer body', async (t) => { + const server = await startServer({port: 10_006}, () => ({ + status: 200, + headers: { + 'content-type': 'text/plain', + }, + body: Buffer.from('Hello World!'), + })) + t.teardown(async () => { + stopServer(server) + }) + + t.like( + await sendRequest({ + method: 'GET', + url: 'http://localhost:10006', + headers: {}, + }), + { + body: 'Hello World!', + }, + ) +}) + test('omitting unused fields', async (t) => { - const server = await startServer({port: 10_006}, () => ({status: 200})) + const server = await startServer({port: 10_007}, () => ({status: 200})) t.teardown(async () => { stopServer(server) }) - t.like(await sendRequest({method: 'GET', url: 'http://localhost:10006'}), { + t.like(await sendRequest({method: 'GET', url: 'http://localhost:10007'}), { status: 200, }) }) @@ -179,7 +204,7 @@ test('allowing upgrading to WebSocket', async (t) => { const webSocketHashingConstant = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' const keyRegex = /^[+/\dA-Za-z]{22}==$/ - const server = await startServer({port: 10_007}, (request) => { + const server = await startServer({port: 10_008}, (request) => { if ( request.headers.connection === 'Upgrade' && request.headers.upgrade === 'websocket' @@ -228,14 +253,14 @@ test('allowing upgrading to WebSocket', async (t) => { stopServer(server) }) - t.like(await sendRequest({method: 'GET', url: 'http://localhost:10007'}), { + t.like(await sendRequest({method: 'GET', url: 'http://localhost:10008'}), { status: 426, }) t.like( await sendRequest({ method: 'GET', - url: 'http://localhost:10007', + url: 'http://localhost:10008', headers: { Connection: 'Upgrade', Upgrade: 'unsupported', @@ -249,7 +274,7 @@ test('allowing upgrading to WebSocket', async (t) => { t.like( await sendRequest({ method: 'GET', - url: 'http://localhost:10007', + url: 'http://localhost:10008', headers: { Connection: 'Upgrade', Upgrade: 'websocket', @@ -263,7 +288,7 @@ test('allowing upgrading to WebSocket', async (t) => { t.like( await sendRequest({ method: 'GET', - url: 'http://localhost:10007', + url: 'http://localhost:10008', headers: { Connection: 'Upgrade', Upgrade: 'websocket', @@ -275,7 +300,7 @@ test('allowing upgrading to WebSocket', async (t) => { }, ) - const ws = new WebSocket('ws://localhost:10007') + const ws = new WebSocket('ws://localhost:10008') await new Promise((resolve) => { ws.once('open', resolve) })