Skip to content

Use a ring buffer to reduce memory churn #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions R/resp-stream-aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ resp_stream_aws <- function(resp, max_size = Inf) {
event
}

find_aws_event_boundary <- function(buffer) {
find_aws_event_boundary <- function(rb) {
# No valid AWS event message is less than 16 bytes
if (length(buffer) < 16) {
if (rb$size() < 16) {
return(NULL)
}

# Read first 4 bytes as a big endian number
event_size <- parse_int(buffer[1:4])
if (event_size > length(buffer)) {
# Read first 4 bytes
event_size_raw <- raw(4)
for (i in 1:4) {
event_size_raw[i] <- rb$peek(i)
}

event_size <- parse_int(event_size_raw)
if (event_size > rb$size()) {
return(NULL)
}

event_size + 1
event_size
}

# Implementation from https://github.com/lifion/lifion-aws-event-stream/blob/develop/lib/index.js
Expand Down
141 changes: 60 additions & 81 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ resp_stream_oneline <- function(resp, max_size, warn, encoding) {
if (is.null(line_bytes)) {
return(character())
}

eat_next_lf <- resp$cache$resp_stream_oneline_eat_next_lf
resp$cache$resp_stream_oneline_eat_next_lf <- FALSE

Expand Down Expand Up @@ -197,77 +196,66 @@ resp_stream_oneline <- function(resp, max_size, warn, encoding) {
}
}

find_line_boundary <- function(buffer) {
if (length(buffer) == 0) {
find_line_boundary <- function(rb) {
if (rb$is_empty()) {
return(NULL)
}

# Look left 1 byte
right1 <- c(utils::tail(buffer, -1), 0x00)
cur <- rb$peek(1)
for (i in seq_len(rb$size() - 1)) {
nxt <- rb$peek(i + 1)

crlf <- buffer == 0x0D & right1 == 0x0A
cr <- buffer == 0x0D
lf <- buffer == 0x0A
# Check for CRLF sequence
if (is_crlf(cur, nxt)) {
return(i + 1)
}
# Check for single CR or LF
if (is_cr(cur) || is_lf(cur)) {
return(i)
}

all <- which(crlf | cr | lf)
if (length(all) == 0) {
return(NULL)
cur <- nxt
}

first <- all[[1]]
if (crlf[first]) {
return(first + 2)
} else {
return(first + 1)
# Check the last byte
if (is_cr(cur) || is_lf(cur)) {
return(rb$size())
}

NULL
}

# Function to find the first double line ending in a buffer, or NULL if no
# double line ending is found
#
# Example:
# find_event_boundary(charToRaw("data: 1\n\nid: 12345"))
# Returns:
# list(
# matched = charToRaw("data: 1\n\n"),
# remaining = charToRaw("id: 12345")
# )
find_event_boundary <- function(buffer) {
if (length(buffer) < 2) {
find_event_boundary <- function(rb) {
if (rb$size() < 2) {
return(NULL)
}

# leftX means look behind by X bytes. For example, left1[2] equals buffer[1].
# Any attempt to read past the beginning of the buffer results in 0x00.
left1 <- c(0x00, utils::head(buffer, -1))
left2 <- c(0x00, utils::head(left1, -1))
left3 <- c(0x00, utils::head(left2, -1))
cur <- rb$peek(1)
for (i in 1:(rb$size() - 1)) {
nxt <- rb$peek(i + 1)

boundary_end <- which(
(left1 == 0x0A & buffer == 0x0A) | # \n\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
)
# Check for \n\n or \r\r
if ((is_lf(cur) && is_lf(nxt)) || (is_cr(cur) && is_cr(nxt))) {
return(i + 1)
}

if (length(boundary_end) == 0) {
return(NULL) # No event boundary found
# Check for \r\n\r\n sequence
if (i <= rb$size() - 3) {
byte3 <- rb$peek(i + 2)
byte4 <- rb$peek(i + 3)
if (is_crlf(cur, nxt) && is_crlf(byte3, byte4)) {
return(i + 3)
}
}

cur <- nxt
}

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
split_at
NULL
}

# Splits a buffer into the part before `split_at`, and the part starting at
# `split_at`. It's possible for either of the returned parts to be zero-length
# (i.e. if `split_at` is 1 or length(buffer)+1).
split_buffer <- function(buffer, split_at) {
# Return a list with the event data and the remaining buffer
list(
matched = slice(buffer, end = split_at),
remaining = slice(buffer, start = split_at)
)
}

# @param max_size Maximum number of bytes to look for a boundary before throwing an error
# @param boundary_func A function that takes a raw vector and returns NULL if no
Expand All @@ -285,11 +273,8 @@ resp_boundary_pushback <- function(
check_streaming_response(resp)
check_number_whole(max_size, min = 1, allow_infinite = TRUE)

chunk_size <- min(max_size + 1, 1024)

# Grab data left over from last resp_stream_sse() call (if any)
buffer <- resp$cache$push_back %||% raw()
resp$cache$push_back <- raw()
chunk_size <- if (is.infinite(max_size)) 1024 else max_size + 1
buffer <- env_cache(resp$cache, "buffer", RingBuffer$new(chunk_size))

if (resp_stream_show_buffer(resp)) {
log_stream(cli::rule("Buffer"), prefix = "* ")
Expand All @@ -309,47 +294,43 @@ resp_boundary_pushback <- function(
# Read chunks until we find an event or reach the end of input
repeat {
# Try to find an event boundary using the data we have
print_buffer(buffer, "Buffer to parse")
split_at <- boundary_func(buffer)

if (!is.null(split_at)) {
result <- split_buffer(buffer, split_at)
# We found a complete event
print_buffer(result$matched, "Matched data")
print_buffer(result$remaining, "Remaining buffer")
resp$cache$push_back <- result$remaining
return(result$matched)
print_buffer(buffer$peek_all(), "Buffer to parse")
boundary_pos <- boundary_func(buffer)

if (!is.null(boundary_pos)) {
matched <- buffer$pop(boundary_pos)

print_buffer(matched, "Matched data")
print_buffer(buffer$peek_all(), "Remaining buffer")
return(matched)
}

if (length(buffer) > max_size) {
if (buffer$size() > max_size) {
# Keep the buffer in place, so that if the user tries resp_stream_sse
# again, they'll get the same error rather than reading the stream
# having missed a bunch of bytes.
resp$cache$push_back <- buffer
cli::cli_abort(
"Streaming read exceeded size limit of {max_size}",
class = "httr2_streaming_error"
)
}

# We didn't have enough data. Attempt to read more
chunk <- readBin(
resp$body,
raw(),
# Don't let us exceed the max size by more than one byte; we do allow the
# one extra byte so we know to error.
n = min(chunk_size, max_size - length(buffer) + 1)
)
# Don't let us exceed the max size by more than one byte; we do allow the
# one extra byte so we know to error.
next_size <- min(chunk_size, max_size - buffer$size() + 1)
chunk <- readBin(resp$body, raw(), n = next_size)
buffer$push(chunk)
print_buffer(chunk, "Received chunk")

if (length(chunk) == 0) {
if (!isIncomplete(resp$body)) {
# We've truly reached the end of the connection; no more data is coming
if (length(buffer) == 0) {
if (buffer$is_empty()) {
return(NULL)
} else {
if (include_trailer) {
return(buffer)
return(buffer$pop())
} else {
cli::cli_warn(
"Premature end of input; ignoring final partial chunk"
Expand All @@ -359,16 +340,14 @@ resp_boundary_pushback <- function(
}
} else {
# More data might come later; store the buffer and return NULL
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
print_buffer(buffer$peek_all(), "Storing incomplete buffer")
return(NULL)
}
}

# More data was received; combine it with existing buffer and continue the
# loop to try parsing again
buffer <- c(buffer, chunk)
print_buffer(buffer, "Combined buffer")
print_buffer(buffer$peek_all(), "Combined buffer")
}
}

Expand Down
Loading
Loading