Skip to content

Commit 8a3f117

Browse files
committed
feat(node): add expect 100-continue support to http client
1 parent 7949f53 commit 8a3f117

File tree

3 files changed

+71
-12
lines changed

3 files changed

+71
-12
lines changed

ext/node/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ deno_core::extension!(deno_node,
367367
ops::http::op_node_http_fetch_response_upgrade,
368368
ops::http::op_node_http_request_with_conn<P>,
369369
ops::http::op_node_http_await_response,
370+
ops::http::op_node_http_await_continue,
370371
ops::http2::op_http2_connect,
371372
ops::http2::op_http2_poll_client_connection,
372373
ops::http2::op_http2_client_request,

ext/node/ops/http.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::task::Poll;
1111
use bytes::Bytes;
1212
use deno_core::error::bad_resource;
1313
use deno_core::error::type_error;
14+
use deno_core::futures::channel::oneshot;
1415
use deno_core::futures::stream::Peekable;
1516
use deno_core::futures::Future;
1617
use deno_core::futures::FutureExt;
@@ -43,6 +44,7 @@ use http::header::HeaderName;
4344
use http::header::HeaderValue;
4445
use http::header::AUTHORIZATION;
4546
use http::header::CONTENT_LENGTH;
47+
use http::header::EXPECT;
4648
use http::Method;
4749
use http_body_util::BodyExt;
4850
use hyper::body::Frame;
@@ -72,6 +74,7 @@ type CancelableResponseResult =
7274
pub struct NodeHttpClientResponse {
7375
response: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
7476
url: String,
77+
expect_100_rx: RefCell<Option<oneshot::Receiver<()>>>,
7578
}
7679

7780
impl Debug for NodeHttpClientResponse {
@@ -190,16 +193,26 @@ where
190193
header_map.append(name, v);
191194
}
192195

193-
let (body, con_len) = if let Some(body) = body {
196+
let (body, con_len, expect_100_rx) = if let Some(body) = body {
197+
let resource = state
198+
.borrow_mut()
199+
.resource_table
200+
.take_any(body)
201+
.map_err(ConnError::Resource)?;
202+
let expect_100 = header_map
203+
.get(EXPECT)
204+
.map(|v| v == "100-continue")
205+
.unwrap_or(false);
206+
let (tx, rx) = if expect_100 {
207+
let (tx, rx) = oneshot::channel();
208+
(Some(tx), Some(rx))
209+
} else {
210+
(None, None)
211+
};
194212
(
195-
BodyExt::boxed(NodeHttpResourceToBodyAdapter::new(
196-
state
197-
.borrow_mut()
198-
.resource_table
199-
.take_any(body)
200-
.map_err(ConnError::Resource)?,
201-
)),
213+
BodyExt::boxed(NodeHttpResourceToBodyAdapter::new(resource, tx)),
202214
None,
215+
rx,
203216
)
204217
} else {
205218
// POST and PUT requests should always have a 0 length content-length,
@@ -214,6 +227,7 @@ where
214227
.map_err(|never| match never {})
215228
.boxed(),
216229
len,
230+
None,
217231
)
218232
};
219233

@@ -250,6 +264,7 @@ where
250264
.add(NodeHttpClientResponse {
251265
response: Box::pin(fut),
252266
url: url.clone(),
267+
expect_100_rx: RefCell::new(expect_100_rx),
253268
});
254269

255270
let cancel_handle_rid = state
@@ -263,6 +278,28 @@ where
263278
})
264279
}
265280

281+
#[op2(async)]
282+
pub async fn op_node_http_await_continue(
283+
state: Rc<RefCell<OpState>>,
284+
#[smi] rid: ResourceId,
285+
) -> bool {
286+
let Ok(resource) = state
287+
.borrow_mut()
288+
.resource_table
289+
.get::<NodeHttpClientResponse>(rid)
290+
else {
291+
return false;
292+
};
293+
294+
let Some(rx) = resource.expect_100_rx.borrow_mut().take() else {
295+
return false;
296+
};
297+
298+
drop(resource);
299+
300+
rx.await.is_ok()
301+
}
302+
266303
#[op2(async)]
267304
#[serde]
268305
pub async fn op_node_http_await_response(
@@ -550,12 +587,18 @@ pub struct NodeHttpResourceToBodyAdapter(
550587
Option<
551588
Pin<Box<dyn Future<Output = Result<BufView, deno_core::anyhow::Error>>>>,
552589
>,
590+
Option<oneshot::Sender<()>>,
553591
);
554592

593+
const READ_LIMIT: usize = 64 * 1024;
594+
555595
impl NodeHttpResourceToBodyAdapter {
556-
pub fn new(resource: Rc<dyn Resource>) -> Self {
557-
let future = resource.clone().read(64 * 1024);
558-
Self(resource, Some(future))
596+
pub fn new(
597+
resource: Rc<dyn Resource>,
598+
expect_100_tx: Option<oneshot::Sender<()>>,
599+
) -> Self {
600+
let future = resource.clone().read(READ_LIMIT);
601+
Self(resource, Some(future), expect_100_tx)
559602
}
560603
}
561604

@@ -572,6 +615,9 @@ impl Stream for NodeHttpResourceToBodyAdapter {
572615
cx: &mut Context<'_>,
573616
) -> Poll<Option<Self::Item>> {
574617
let this = self.get_mut();
618+
if let Some(tx) = this.2.take() {
619+
let _ = tx.send(());
620+
}
575621
if let Some(mut fut) = this.1.take() {
576622
match fut.poll_unpin(cx) {
577623
Poll::Pending => {
@@ -582,7 +628,7 @@ impl Stream for NodeHttpResourceToBodyAdapter {
582628
Ok(buf) if buf.is_empty() => Poll::Ready(None),
583629
Ok(buf) => {
584630
let bytes: Bytes = buf.to_vec().into();
585-
this.1 = Some(this.0.clone().read(64 * 1024));
631+
this.1 = Some(this.0.clone().read(READ_LIMIT));
586632
Poll::Ready(Some(Ok(bytes)))
587633
}
588634
Err(err) => Poll::Ready(Some(Err(err))),

ext/node/polyfills/http.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import { core, primordials } from "ext:core/mod.js";
77
import {
8+
op_node_http_await_continue,
89
op_node_http_await_response,
910
op_node_http_fetch_response_upgrade,
1011
op_node_http_request_with_conn,
@@ -429,6 +430,8 @@ class ClientRequest extends OutgoingMessage {
429430
_writeHeader() {
430431
const url = this._createUrlStrFromOptions();
431432

433+
const expect100 = this[kOutHeaders].expect?.[1] === "100-continue";
434+
432435
const headers = [];
433436
for (const key in this[kOutHeaders]) {
434437
if (Object.hasOwn(this[kOutHeaders], key)) {
@@ -474,6 +477,15 @@ class ClientRequest extends OutgoingMessage {
474477
this._encrypted,
475478
);
476479
this._flushBuffer();
480+
if (expect100) {
481+
const promise = op_node_http_await_continue(this._req!.requestRid);
482+
core.unrefOpPromise(promise);
483+
promise.then((c) => {
484+
if (c) {
485+
this.emit("continue");
486+
}
487+
});
488+
}
477489
const res = await op_node_http_await_response(this._req!.requestRid);
478490
if (this._req.cancelHandleRid !== null) {
479491
core.tryClose(this._req.cancelHandleRid);

0 commit comments

Comments
 (0)