Skip to content

Commit

Permalink
feat(web-streams): Support sending and receiving Web Stream bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
vinsonchuong committed Oct 30, 2023
1 parent ca07ae2 commit 0b67a22
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 12 deletions.
6 changes: 6 additions & 0 deletions http/parse-body.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {Readable} from 'node:stream'
import typeIs from 'type-is'
import getStream from 'get-stream'

Expand All @@ -14,6 +15,11 @@ export function parseHttp1Body(requestOrResponse) {
return ''
}

if (typeIs(requestOrResponse, ['text/event-stream'])) {
requestOrResponse.setEncoding('utf8')
return Readable.toWeb(requestOrResponse)
}

if (typeIs(requestOrResponse, textMediaTypes)) {
return getStream(requestOrResponse)
}
Expand Down
54 changes: 54 additions & 0 deletions http/send-request/index.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import {Buffer} from 'node:buffer'
import {ReadableStream} from 'node:stream/web'
import * as https from 'node:https'
import * as http from 'node:http'
import test from 'ava'
import makeCert from 'make-cert'
import getStream from 'get-stream'
import sendRequest from './index.js'

test('sending a simple GET request', async (t) => {
Expand Down Expand Up @@ -74,3 +77,54 @@ test('parsing a binary body into a Buffer', async (t) => {

t.true(response.body instanceof Buffer)
})

test('supporting streaming server-sent events', async (t) => {
const server = new http.Server()
server.on('request', (request, response) => {
response.writeHead(200, {
'Content-Type': 'text/event-stream',
})
response.end(
[
': comment\n',
'\n',
'data: some text\n',
'\n',
'data: multiple\n',
'data: lines\n',
'\n',
'event: foo\n',
'data: bar\n',
'\n',
].join(''),
)
})

await new Promise((resolve) => {
server.listen(12_000)
server.once('listening', resolve)
})

const response = await sendRequest({
method: 'GET',
url: 'http://localhost:12000',
headers: {},
})

t.true(response.body instanceof ReadableStream)
t.is(
await getStream(response.body),
[
': comment\n',
'\n',
'data: some text\n',
'\n',
'data: multiple\n',
'data: lines\n',
'\n',
'event: foo\n',
'data: bar\n',
'\n',
].join(''),
)
})
4 changes: 4 additions & 0 deletions http/start-server/build-responder.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {ReadableStream} from 'node:stream/web'
import {Readable} from 'node:stream'
import {Buffer} from 'node:buffer'
import * as http2 from 'node:http2'
import omit from 'lodash/omit.js'
Expand Down Expand Up @@ -36,6 +38,8 @@ export default function (computeResponse) {
response.body instanceof Buffer
) {
nodeResponse.end(response.body)
} else if (response.body instanceof ReadableStream) {
Readable.fromWeb(response.body).pipe(nodeResponse)
} else {
response.body.pipe(nodeResponse)
}
Expand Down
49 changes: 37 additions & 12 deletions http/start-server/index.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Buffer} from 'node:buffer'
import {createHash} from 'node:crypto'
import {Readable} from 'node:stream'
import test from 'ava'
import WebSocket from 'ws'
import makeCert from 'make-cert'
Expand Down Expand Up @@ -116,13 +117,13 @@ test('starting an HTTP server', async (t) => {
await session.close()
})

test('supporting a stream body', async (t) => {
test('supporting a web stream body', async (t) => {
const server = await startServer({port: 10_004}, () => ({
status: 200,
headers: {
'content-type': 'text/plain',
},
body: intoStream('Hello World!'),
body: Readable.toWeb(intoStream('Hello World!')),
}))
t.teardown(async () => {
stopServer(server)
Expand All @@ -140,13 +141,13 @@ test('supporting a stream body', async (t) => {
)
})

test('supporting a buffer body', async (t) => {
test('supporting a stream body', async (t) => {
const server = await startServer({port: 10_005}, () => ({
status: 200,
headers: {
'content-type': 'text/plain',
},
body: Buffer.from('Hello World!'),
body: intoStream('Hello World!'),
}))
t.teardown(async () => {
stopServer(server)
Expand All @@ -164,13 +165,37 @@ test('supporting a buffer body', async (t) => {
)
})

test('supporting a buffer body', async (t) => {
const server = await startServer({port: 10_006}, () => ({
status: 200,
headers: {
'content-type': 'text/plain',
},
body: Buffer.from('Hello World!'),
}))
t.teardown(async () => {
stopServer(server)
})

t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10006',
headers: {},
}),
{
body: 'Hello World!',
},
)
})

test('omitting unused fields', async (t) => {
const server = await startServer({port: 10_006}, () => ({status: 200}))
const server = await startServer({port: 10_007}, () => ({status: 200}))
t.teardown(async () => {
stopServer(server)
})

t.like(await sendRequest({method: 'GET', url: 'http://localhost:10006'}), {
t.like(await sendRequest({method: 'GET', url: 'http://localhost:10007'}), {
status: 200,
})
})
Expand All @@ -179,7 +204,7 @@ test('allowing upgrading to WebSocket', async (t) => {
const webSocketHashingConstant = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
const keyRegex = /^[+/\dA-Za-z]{22}==$/

const server = await startServer({port: 10_007}, (request) => {
const server = await startServer({port: 10_008}, (request) => {
if (
request.headers.connection === 'Upgrade' &&
request.headers.upgrade === 'websocket'
Expand Down Expand Up @@ -228,14 +253,14 @@ test('allowing upgrading to WebSocket', async (t) => {
stopServer(server)
})

t.like(await sendRequest({method: 'GET', url: 'http://localhost:10007'}), {
t.like(await sendRequest({method: 'GET', url: 'http://localhost:10008'}), {
status: 426,
})

t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10007',
url: 'http://localhost:10008',
headers: {
Connection: 'Upgrade',
Upgrade: 'unsupported',
Expand All @@ -249,7 +274,7 @@ test('allowing upgrading to WebSocket', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10007',
url: 'http://localhost:10008',
headers: {
Connection: 'Upgrade',
Upgrade: 'websocket',
Expand All @@ -263,7 +288,7 @@ test('allowing upgrading to WebSocket', async (t) => {
t.like(
await sendRequest({
method: 'GET',
url: 'http://localhost:10007',
url: 'http://localhost:10008',
headers: {
Connection: 'Upgrade',
Upgrade: 'websocket',
Expand All @@ -275,7 +300,7 @@ test('allowing upgrading to WebSocket', async (t) => {
},
)

const ws = new WebSocket('ws://localhost:10007')
const ws = new WebSocket('ws://localhost:10008')
await new Promise((resolve) => {
ws.once('open', resolve)
})
Expand Down

0 comments on commit 0b67a22

Please sign in to comment.