Skip to content

Commit 13a141e

Browse files
committed
feat(fetch): implement fetch body streams
Closes #946 Implements Response.body and Request.body as ReadableStream per WHATWG Fetch spec: - Response.body returns ReadableStream<Uint8Array> for streaming response data - Request.body returns ReadableStream<Uint8Array> for request bodies - Byte stream controller (type: "bytes") with BYOB reader support - Response.clone() uses tee() to allow cloning after body access - fetch() accepts ReadableStream as request body for streaming uploads - Compressed responses (gzip/deflate/brotli) stream decompressed data - pipeTo() support for piping body streams to WritableStream Added 42 new tests covering fetch body stream functionality.
1 parent 605db49 commit 13a141e

File tree

16 files changed

+1777
-54
lines changed

16 files changed

+1777
-54
lines changed

API.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,8 +535,7 @@ export class XMLParser(options?: XmlParserOptions){
535535
> There are some differences with the [WHATWG standard](https://fetch.spec.whatwg.org). Mainly browser specific behavior is removed:
536536
>
537537
> - `keepalive` is always true
538-
> - `request.body` can only be `string`, `Array`, `ArrayBuffer` or `Uint8Array`
539-
> - `response.body` returns `null`. Use `response.text()`, `response.json()` etc
538+
> - `request.body` can only be `string`, `Array`, `ArrayBuffer`, `Uint8Array`, `Blob`, or `ReadableStream`
540539
> - `mode`, `credentials`, `referrerPolicy`, `priority`, `cache` is not available/applicable
541540
542541
## FILEAPI

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/llrt_fetch/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ llrt_context = { version = "0.7.0-beta", path = "../../libs/llrt_context" }
3636
llrt_encoding = { version = "0.7.0-beta", path = "../../libs/llrt_encoding" }
3737
llrt_http = { version = "0.7.0-beta", default-features = false, path = "../llrt_http" }
3838
llrt_json = { version = "0.7.0-beta", path = "../../libs/llrt_json" }
39+
llrt_stream_web = { version = "0.7.0-beta", path = "../llrt_stream_web" }
3940
llrt_url = { version = "0.7.0-beta", path = "../llrt_url" }
4041
llrt_utils = { version = "0.7.0-beta", path = "../../libs/llrt_utils", default-features = false }
4142
once_cell = { version = "1", features = ["std"], default-features = false }

modules/llrt_fetch/src/body_stream.rs

Lines changed: 706 additions & 0 deletions
Large diffs are not rendered by default.

modules/llrt_fetch/src/fetch.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rquickjs::{
2525
use tokio::{select, sync::Semaphore};
2626

2727
use super::{
28+
body_stream::read_all_bytes_from_stream,
2829
headers::{Headers, HeadersGuard},
2930
response::Response,
3031
security::ensure_url_access,
@@ -85,6 +86,18 @@ pub fn init(global_client: HyperClient, globals: &Object) -> Result<()> {
8586

8687
ensure_url_access(&ctx, &uri)?;
8788

89+
// Convert stream body to bytes if needed (must be done before request loop)
90+
let body_bytes: Option<BodyBytes> = match options.body {
91+
Some(FetchBody::Bytes(bytes)) => Some(bytes),
92+
Some(FetchBody::Stream(stream)) => {
93+
let bytes = read_all_bytes_from_stream(&ctx, stream).await?;
94+
let typed_array = bytes_to_typed_array(ctx.clone(), &bytes)?;
95+
let object_bytes = ObjectBytes::from(&ctx, &typed_array)?;
96+
Some(BodyBytes::new(ctx.clone(), object_bytes)?)
97+
},
98+
None => None,
99+
};
100+
88101
let mut redirect_count = 0;
89102
let mut response_status = 0;
90103
let (res, guard) = loop {
@@ -93,7 +106,7 @@ pub fn init(global_client: HyperClient, globals: &Object) -> Result<()> {
93106
&method,
94107
&uri,
95108
options.headers.as_ref(),
96-
options.body.as_ref(),
109+
body_bytes.as_ref(),
97110
&response_status,
98111
&initial_uri,
99112
)?;
@@ -331,11 +344,16 @@ impl<'js> BodyBytes<'js> {
331344
}
332345
}
333346

347+
enum FetchBody<'js> {
348+
Bytes(BodyBytes<'js>),
349+
Stream(llrt_stream_web::ReadableStreamClass<'js>),
350+
}
351+
334352
struct FetchOptions<'js> {
335353
method: hyper::Method,
336354
url: String,
337355
headers: Option<Headers>,
338-
body: Option<BodyBytes<'js>>,
356+
body: Option<FetchBody<'js>>,
339357
abort_receiver: Option<mc_oneshot::Receiver<Value<'js>>>,
340358
redirect: String,
341359
agent: Option<Class<'js, Agent>>,
@@ -395,14 +413,22 @@ fn get_fetch_options<'js>(
395413
if let Some(body_opt) =
396414
get_option::<Value>("body", arg_opts.as_ref(), resource_opts.as_ref())?
397415
{
398-
let bytes = if let Ok(blob) = Class::<Blob>::from_value(&body_opt) {
399-
let blob = blob.borrow();
400-
let typed_array = bytes_to_typed_array(ctx.clone(), &blob.get_bytes())?;
401-
ObjectBytes::from(ctx, &typed_array)?
416+
// Check if body is a ReadableStream first
417+
if let Some(stream) = body_opt
418+
.as_object()
419+
.and_then(Class::<llrt_stream_web::ReadableStreamStruct>::from_object)
420+
{
421+
body = Some(FetchBody::Stream(stream));
402422
} else {
403-
ObjectBytes::from(ctx, &body_opt)?
404-
};
405-
body = Some(BodyBytes::new(ctx.clone(), bytes)?);
423+
let bytes = if let Ok(blob) = Class::<Blob>::from_value(&body_opt) {
424+
let blob = blob.borrow();
425+
let typed_array = bytes_to_typed_array(ctx.clone(), &blob.get_bytes())?;
426+
ObjectBytes::from(ctx, &typed_array)?
427+
} else {
428+
ObjectBytes::from(ctx, &body_opt)?
429+
};
430+
body = Some(FetchBody::Bytes(BodyBytes::new(ctx.clone(), bytes)?));
431+
}
406432
}
407433

408434
if let Some(url_opt) =

modules/llrt_fetch/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub use self::security::{get_allow_list, get_deny_list, set_allow_list, set_deny
1414
use self::{form_data::FormData, headers::Headers, request::Request, response::Response};
1515

1616
mod body;
17+
mod body_stream;
1718
pub mod fetch;
1819
pub mod form_data;
1920
pub mod headers;

modules/llrt_fetch/src/request.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use rquickjs::{
1212
IntoJs, Null, Object, Result, TypedArray, Value,
1313
};
1414

15+
use crate::body_stream::{create_value_stream, read_all_bytes_from_stream};
16+
use llrt_stream_web::ReadableStreamClass;
17+
1518
use super::{
1619
headers::{Headers, HeadersGuard, HEADERS_KEY_CONTENT_TYPE},
1720
strip_bom, Blob, FormData, MIME_TYPE_FORM_DATA, MIME_TYPE_FORM_URLENCODED,
@@ -56,6 +59,7 @@ impl RequestMode {
5659
#[derive(rquickjs::JsLifetime)]
5760
enum BodyVariant<'js> {
5861
Provided(Option<Value<'js>>),
62+
Stream(Option<ReadableStreamClass<'js>>),
5963
Empty,
6064
}
6165

@@ -79,8 +83,10 @@ impl<'js> Trace<'js> for Request<'js> {
7983
}
8084
let body = self.body.read().unwrap();
8185
let body = &*body;
82-
if let BodyVariant::Provided(Some(body)) = body {
83-
body.trace(tracer);
86+
match body {
87+
BodyVariant::Provided(Some(body)) => body.trace(tracer),
88+
BodyVariant::Stream(Some(stream)) => stream.trace(tracer),
89+
_ => {},
8490
}
8591
}
8692
}
@@ -142,13 +148,32 @@ impl<'js> Request<'js> {
142148
stringify!(Request)
143149
}
144150

145-
//TODO should implement readable stream
151+
/// Returns the body as a ReadableStream.
152+
///
153+
/// Per WHATWG Fetch specification:
154+
/// - Returns null if the body is null (empty)
155+
/// - Returns a ReadableStream that yields Uint8Array chunks
146156
#[qjs(get)]
147157
fn body(&self, ctx: Ctx<'js>) -> Result<Value<'js>> {
148-
let body = self.body.read().unwrap();
149-
let body = &*body;
158+
let mut body_guard = self.body.write().unwrap();
159+
let body = &mut *body_guard;
150160
match body {
151-
BodyVariant::Provided(value) => value.into_js(&ctx),
161+
BodyVariant::Provided(provided) => {
162+
if let Some(value) = provided.take() {
163+
create_value_stream(&ctx, value)
164+
} else {
165+
// Body already consumed - return null
166+
Ok(Value::new_null(ctx))
167+
}
168+
},
169+
BodyVariant::Stream(stream) => {
170+
if let Some(s) = stream.take() {
171+
Ok(s.into_value())
172+
} else {
173+
// Stream already consumed - return null
174+
Ok(Value::new_null(ctx))
175+
}
176+
},
152177
BodyVariant::Empty => Null.into_js(&ctx),
153178
}
154179
}
@@ -169,6 +194,7 @@ impl<'js> Request<'js> {
169194
let body = &*body;
170195
match body {
171196
BodyVariant::Provided(value) => value.is_none(),
197+
BodyVariant::Stream(stream) => stream.is_none(),
172198
BodyVariant::Empty => false,
173199
}
174200
}
@@ -252,6 +278,7 @@ impl<'js> Request<'js> {
252278
let body = &*body;
253279
let body = match body {
254280
BodyVariant::Provided(provided) => BodyVariant::Provided(provided.clone()),
281+
BodyVariant::Stream(stream) => BodyVariant::Stream(stream.clone()),
255282
BodyVariant::Empty => BodyVariant::Empty,
256283
};
257284

@@ -288,6 +315,13 @@ impl<'js> Request<'js> {
288315
bytes.as_bytes(ctx)?.to_vec()
289316
}
290317
},
318+
BodyVariant::Stream(stream) => {
319+
let stream = stream
320+
.take()
321+
.ok_or(Exception::throw_message(ctx, "Already read"))?;
322+
drop(body_guard);
323+
read_all_bytes_from_stream(ctx, stream).await?
324+
},
291325
BodyVariant::Empty => return Ok(None),
292326
};
293327

@@ -361,7 +395,12 @@ fn assign_request<'js>(request: &mut Request<'js>, ctx: Ctx<'js>, obj: &Object<'
361395
content_type = Some(MIME_TYPE_TEXT.into());
362396
BodyVariant::Provided(Some(body))
363397
} else if let Some(obj) = body.as_object() {
364-
if let Some(blob) = Class::<Blob>::from_object(obj) {
398+
if let Some(stream) =
399+
Class::<llrt_stream_web::ReadableStreamStruct>::from_object(obj)
400+
{
401+
// ReadableStream body - store as Stream variant
402+
BodyVariant::Stream(Some(stream))
403+
} else if let Some(blob) = Class::<Blob>::from_object(obj) {
365404
let blob = blob.borrow();
366405
if !blob.mime_type().is_empty() {
367406
content_type = Some(blob.mime_type());

0 commit comments

Comments
 (0)