Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ http-body = "1.0"
http-body-util = "0.1.2"
http_v02 = { package = "http", version = "0.2.9" }
httparse = "1.8.0"
hyper = { version = "1.4.1", features = ["full"] }
hyper = { version = "1.6.0", features = ["full"] }
hyper-rustls = { version = "0.27.2", default-features = false, features = ["http1", "http2", "tls12", "ring"] }
hyper-util = { version = "0.1.10", features = ["tokio", "client", "client-legacy", "server", "server-auto"] }
hyper_v014 = { package = "hyper", version = "0.14.26", features = ["runtime", "http1"] }
Expand Down
1 change: 1 addition & 0 deletions ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ deno_core::extension!(deno_node,
ops::zlib::brotli::op_brotli_decompress_stream_end,
ops::http::op_node_http_fetch_response_upgrade,
ops::http::op_node_http_request_with_conn<P>,
ops::http::op_node_http_await_information,
ops::http::op_node_http_await_response,
ops::http2::op_http2_connect,
ops::http2::op_http2_poll_client_connection,
Expand Down
64 changes: 64 additions & 0 deletions ext/node/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::Poll;

use bytes::Bytes;
use deno_core::error::ResourceError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::FutureExt;
Expand Down Expand Up @@ -70,9 +71,20 @@ pub struct NodeHttpResponse {
type CancelableResponseResult =
Result<Result<http::Response<Incoming>, hyper::Error>, Canceled>;

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
struct InformationalResponse {
status: u16,
status_text: String,
headers: Vec<(ByteString, ByteString)>,
version_major: u16,
version_minor: u16,
}

pub struct NodeHttpClientResponse {
response: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
url: String,
informational_rx: RefCell<Option<mpsc::Receiver<InformationalResponse>>>,
}

impl Debug for NodeHttpClientResponse {
Expand Down Expand Up @@ -252,6 +264,36 @@ where
request.headers_mut().insert(CONTENT_LENGTH, len.into());
}

let (tx, informational_rx) = mpsc::channel(1);
hyper::ext::on_informational(&mut request, move |res| {
let mut tx = tx.clone();
let _ = tx.try_send(InformationalResponse {
status: res.status().as_u16(),
status_text: res.status().canonical_reason().unwrap_or("").to_string(),
headers: res
.headers()
.iter()
.map(|(k, v)| (k.as_str().into(), v.as_bytes().into()))
.collect(),
version_major: match res.version() {
hyper::Version::HTTP_09 => 0,
hyper::Version::HTTP_10 => 1,
hyper::Version::HTTP_11 => 1,
hyper::Version::HTTP_2 => 2,
hyper::Version::HTTP_3 => 3,
_ => unreachable!(),
},
version_minor: match res.version() {
hyper::Version::HTTP_09 => 9,
hyper::Version::HTTP_10 => 0,
hyper::Version::HTTP_11 => 1,
hyper::Version::HTTP_2 => 0,
hyper::Version::HTTP_3 => 0,
_ => unreachable!(),
},
});
});

let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();

Expand All @@ -264,6 +306,7 @@ where
.add(NodeHttpClientResponse {
response: Box::pin(fut),
url: url.clone(),
informational_rx: RefCell::new(Some(informational_rx)),
});

let cancel_handle_rid = state
Expand All @@ -277,6 +320,27 @@ where
})
}

#[op2(async)]
#[serde]
pub async fn op_node_http_await_information(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Option<InformationalResponse> {
let Ok(resource) = state
.borrow_mut()
.resource_table
.get::<NodeHttpClientResponse>(rid)
else {
return None;
};

let mut rx = resource.informational_rx.borrow_mut().take()?;

drop(resource);

rx.next().await
}

#[op2(async)]
#[serde]
pub async fn op_node_http_await_response(
Expand Down
64 changes: 63 additions & 1 deletion ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { core, primordials } from "ext:core/mod.js";
import {
op_node_http_await_information,
op_node_http_await_response,
op_node_http_fetch_response_upgrade,
op_node_http_request_with_conn,
Expand Down Expand Up @@ -484,6 +485,44 @@ class ClientRequest extends OutgoingMessage {
this._encrypted,
);
this._flushBuffer();

const infoPromise = op_node_http_await_information(
this._req!.requestRid,
);
core.unrefOpPromise(infoPromise);
infoPromise.then((info) => {
if (!info) return;

if (info.status === 100) this.emit("continue");

let headers;
let rawHeaders;

this.emit("information", {
statusCode: info.status,
statusMessage: info.statusText,
httpVersionMajor: info.versionMajor,
httpVersionMinor: info.versionMinor,
httpVersion: `${info.versionMajor}.${info.versionMinor}`,
get headers() {
if (!headers) {
headers = {};
for (let i = 0; i < info.headers.length; i++) {
const entry = info.headers[i];
headers[entry[0]] = entry[1];
}
}
return headers;
},
get rawHeaders() {
if (!rawHeaders) {
rawHeaders = info.headers.flat();
}
return rawHeaders;
},
});
});

const res = await op_node_http_await_response(this._req!.requestRid);
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
Expand Down Expand Up @@ -1626,6 +1665,12 @@ ServerResponse.prototype.detachSocket = function (
this._socketOverride = null;
};

ServerResponse.prototype.writeContinue = function writeContinue(cb) {
if (cb) {
nextTick(cb);
}
};

Object.defineProperty(ServerResponse.prototype, "connection", {
get: deprecate(
function (this: ServerResponse) {
Expand Down Expand Up @@ -1831,7 +1876,24 @@ export class ServerImpl extends EventEmitter {
} else {
return new Promise<Response>((resolve): void => {
const res = new ServerResponse(resolve, socket);
this.emit("request", req, res);

if (request.headers.has("expect")) {
if (/(?:^|\W)100-continue(?:$|\W)/i.test(req.headers.expect)) {
if (this.listenerCount("checkContinue") > 0) {
this.emit("checkContinue", req, res);
} else {
res.writeContinue();
this.emit("request", req, res);
}
} else if (this.listenerCount("checkExpectation") > 0) {
this.emit("checkExpectation", req, res);
} else {
res.writeHead(417);
res.end();
}
} else {
this.emit("request", req, res);
}
});
}
};
Expand Down
4 changes: 4 additions & 0 deletions tests/specs/run/expect_100_continue/__test__.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"args": "run -A main.cjs",
"output": "ok\n"
}
Loading
Loading