Skip to content

Commit b4b0257

Browse files
devsnekbartlomieju
authored andcommitted
fix(ext/node): add http information support (#27381)
Implements some client and server events to improve compat. This change makes AWS SDKs more reliable and faster in Deno. Fixes: #27239
1 parent 318646f commit b4b0257

File tree

7 files changed

+211
-20
lines changed

7 files changed

+211
-20
lines changed

Cargo.lock

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ http-body = "1.0"
150150
http-body-util = "0.1.2"
151151
http_v02 = { package = "http", version = "0.2.9" }
152152
httparse = "1.8.0"
153-
hyper = { version = "1.4.1", features = ["full"] }
153+
hyper = { version = "1.6.0", features = ["full"] }
154154
hyper-rustls = { version = "0.27.2", default-features = false, features = ["http1", "http2", "tls12", "ring"] }
155155
hyper-util = { version = "0.1.10", features = ["tokio", "client", "client-legacy", "server", "server-auto"] }
156156
hyper_v014 = { package = "hyper", version = "0.14.26", features = ["runtime", "http1"] }

ext/node/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ deno_core::extension!(deno_node,
375375
ops::zlib::brotli::op_brotli_decompress_stream_end,
376376
ops::http::op_node_http_fetch_response_upgrade,
377377
ops::http::op_node_http_request_with_conn<P>,
378+
ops::http::op_node_http_await_information,
378379
ops::http::op_node_http_await_response,
379380
ops::http2::op_http2_connect,
380381
ops::http2::op_http2_poll_client_connection,

ext/node/ops/http.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::task::Poll;
1111

1212
use bytes::Bytes;
1313
use deno_core::error::ResourceError;
14+
use deno_core::futures::channel::mpsc;
1415
use deno_core::futures::stream::Peekable;
1516
use deno_core::futures::Future;
1617
use deno_core::futures::FutureExt;
@@ -70,9 +71,20 @@ pub struct NodeHttpResponse {
7071
type CancelableResponseResult =
7172
Result<Result<http::Response<Incoming>, hyper::Error>, Canceled>;
7273

74+
#[derive(Serialize, Debug)]
75+
#[serde(rename_all = "camelCase")]
76+
struct InformationalResponse {
77+
status: u16,
78+
status_text: String,
79+
headers: Vec<(ByteString, ByteString)>,
80+
version_major: u16,
81+
version_minor: u16,
82+
}
83+
7384
pub struct NodeHttpClientResponse {
7485
response: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
7586
url: String,
87+
informational_rx: RefCell<Option<mpsc::Receiver<InformationalResponse>>>,
7688
}
7789

7890
impl Debug for NodeHttpClientResponse {
@@ -252,6 +264,36 @@ where
252264
request.headers_mut().insert(CONTENT_LENGTH, len.into());
253265
}
254266

267+
let (tx, informational_rx) = mpsc::channel(1);
268+
hyper::ext::on_informational(&mut request, move |res| {
269+
let mut tx = tx.clone();
270+
let _ = tx.try_send(InformationalResponse {
271+
status: res.status().as_u16(),
272+
status_text: res.status().canonical_reason().unwrap_or("").to_string(),
273+
headers: res
274+
.headers()
275+
.iter()
276+
.map(|(k, v)| (k.as_str().into(), v.as_bytes().into()))
277+
.collect(),
278+
version_major: match res.version() {
279+
hyper::Version::HTTP_09 => 0,
280+
hyper::Version::HTTP_10 => 1,
281+
hyper::Version::HTTP_11 => 1,
282+
hyper::Version::HTTP_2 => 2,
283+
hyper::Version::HTTP_3 => 3,
284+
_ => unreachable!(),
285+
},
286+
version_minor: match res.version() {
287+
hyper::Version::HTTP_09 => 9,
288+
hyper::Version::HTTP_10 => 0,
289+
hyper::Version::HTTP_11 => 1,
290+
hyper::Version::HTTP_2 => 0,
291+
hyper::Version::HTTP_3 => 0,
292+
_ => unreachable!(),
293+
},
294+
});
295+
});
296+
255297
let cancel_handle = CancelHandle::new_rc();
256298
let cancel_handle_ = cancel_handle.clone();
257299

@@ -264,6 +306,7 @@ where
264306
.add(NodeHttpClientResponse {
265307
response: Box::pin(fut),
266308
url: url.clone(),
309+
informational_rx: RefCell::new(Some(informational_rx)),
267310
});
268311

269312
let cancel_handle_rid = state
@@ -277,6 +320,27 @@ where
277320
})
278321
}
279322

323+
#[op2(async)]
324+
#[serde]
325+
pub async fn op_node_http_await_information(
326+
state: Rc<RefCell<OpState>>,
327+
#[smi] rid: ResourceId,
328+
) -> Option<InformationalResponse> {
329+
let Ok(resource) = state
330+
.borrow_mut()
331+
.resource_table
332+
.get::<NodeHttpClientResponse>(rid)
333+
else {
334+
return None;
335+
};
336+
337+
let mut rx = resource.informational_rx.borrow_mut().take()?;
338+
339+
drop(resource);
340+
341+
rx.next().await
342+
}
343+
280344
#[op2(async)]
281345
#[serde]
282346
pub async fn op_node_http_await_response(

ext/node/polyfills/http.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import { core, primordials } from "ext:core/mod.js";
77
import {
8+
op_node_http_await_information,
89
op_node_http_await_response,
910
op_node_http_fetch_response_upgrade,
1011
op_node_http_request_with_conn,
@@ -484,6 +485,44 @@ class ClientRequest extends OutgoingMessage {
484485
this._encrypted,
485486
);
486487
this._flushBuffer();
488+
489+
const infoPromise = op_node_http_await_information(
490+
this._req!.requestRid,
491+
);
492+
core.unrefOpPromise(infoPromise);
493+
infoPromise.then((info) => {
494+
if (!info) return;
495+
496+
if (info.status === 100) this.emit("continue");
497+
498+
let headers;
499+
let rawHeaders;
500+
501+
this.emit("information", {
502+
statusCode: info.status,
503+
statusMessage: info.statusText,
504+
httpVersionMajor: info.versionMajor,
505+
httpVersionMinor: info.versionMinor,
506+
httpVersion: `${info.versionMajor}.${info.versionMinor}`,
507+
get headers() {
508+
if (!headers) {
509+
headers = {};
510+
for (let i = 0; i < info.headers.length; i++) {
511+
const entry = info.headers[i];
512+
headers[entry[0]] = entry[1];
513+
}
514+
}
515+
return headers;
516+
},
517+
get rawHeaders() {
518+
if (!rawHeaders) {
519+
rawHeaders = info.headers.flat();
520+
}
521+
return rawHeaders;
522+
},
523+
});
524+
});
525+
487526
const res = await op_node_http_await_response(this._req!.requestRid);
488527
if (this._req.cancelHandleRid !== null) {
489528
core.tryClose(this._req.cancelHandleRid);
@@ -1626,6 +1665,12 @@ ServerResponse.prototype.detachSocket = function (
16261665
this._socketOverride = null;
16271666
};
16281667

1668+
ServerResponse.prototype.writeContinue = function writeContinue(cb) {
1669+
if (cb) {
1670+
nextTick(cb);
1671+
}
1672+
};
1673+
16291674
Object.defineProperty(ServerResponse.prototype, "connection", {
16301675
get: deprecate(
16311676
function (this: ServerResponse) {
@@ -1831,7 +1876,24 @@ export class ServerImpl extends EventEmitter {
18311876
} else {
18321877
return new Promise<Response>((resolve): void => {
18331878
const res = new ServerResponse(resolve, socket);
1834-
this.emit("request", req, res);
1879+
1880+
if (request.headers.has("expect")) {
1881+
if (/(?:^|\W)100-continue(?:$|\W)/i.test(req.headers.expect)) {
1882+
if (this.listenerCount("checkContinue") > 0) {
1883+
this.emit("checkContinue", req, res);
1884+
} else {
1885+
res.writeContinue();
1886+
this.emit("request", req, res);
1887+
}
1888+
} else if (this.listenerCount("checkExpectation") > 0) {
1889+
this.emit("checkExpectation", req, res);
1890+
} else {
1891+
res.writeHead(417);
1892+
res.end();
1893+
}
1894+
} else {
1895+
this.emit("request", req, res);
1896+
}
18351897
});
18361898
}
18371899
};
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"args": "run -A main.cjs",
3+
"output": "ok\n"
4+
}

0 commit comments

Comments
 (0)