Skip to content

Commit

Permalink
Fix: Range calculation and missing response header
Browse files Browse the repository at this point in the history
  • Loading branch information
das-Abroxas committed Dec 12, 2023
1 parent a913b10 commit 3ad94d5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 33 deletions.
60 changes: 38 additions & 22 deletions src/s3_frontend/s3service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,16 +1111,32 @@ impl S3 for ArunaS3Service {

// Needed for final part
trace!("calculating ranges");
let (query_ranges, filter_ranges) =
let (query_ranges, filter_ranges, actual_from) =
calculate_ranges(req.input.range, content_length as u64, footer_parser)
.map_err(|_| s3_error!(InternalError, "Error while parsing ranges"))?;
let calc_content_len = match filter_ranges {
Some(r) => calculate_content_length_from_range(r),
None => content_length,

let (content_length, accept_ranges, content_range) = match filter_ranges {
Some(filter_range) => (
calculate_content_length_from_range(filter_range),
Some("bytes".to_string()),
Some(format!(
"bytes {}-{}/{}",
actual_from + filter_range.from,
actual_from + filter_range.to - 1,
content_length
)),
),
None => (content_length, None, None),
};
let cloned_key = encryption_key.clone();
debug!(
?content_length,
?query_ranges,
?filter_ranges,
?accept_ranges,
?content_range
);

// Spawn get_object
// Spawn get_object to fetch bytes from storage storage
let backend = self.backend.clone();
tokio::spawn(
async move {
Expand All @@ -1130,9 +1146,10 @@ impl S3 for ArunaS3Service {
}
.instrument(info_span!("get_object")),
);
let (final_send, final_rcv) = async_channel::bounded(10);
let (final_send, final_rcv) = async_channel::unbounded();

// Spawn final part
let cloned_key = encryption_key.clone();
tokio::spawn(
async move {
pin!(receiver);
Expand All @@ -1141,23 +1158,19 @@ impl S3 for ArunaS3Service {
AsyncSenderSink::new(final_send),
);

if let Some(r) = filter_ranges {
asrw = asrw.add_transformer(Filter::new(r));
asrw = asrw
.add_transformer(trace_err!(ChaCha20Dec::new(cloned_key)
.map_err(|_| { s3_error!(InternalError, "Internal notifier error") }))?)
.add_transformer(ZstdDec::new());

if let Some(filter_range) = filter_ranges {
asrw = asrw.add_transformer(Filter::new(filter_range));
};

trace_err!(
asrw.add_transformer(trace_err!(ChaCha20Dec::new(cloned_key)
.map_err(|_| { s3_error!(InternalError, "Internal notifier error") }))?)
.add_transformer(ZstdDec::new())
.process()
.await
)
.map_err(|_| s3_error!(InternalError, "Internal notifier error"))?;
trace_err!(asrw.process().await)
.map_err(|_| s3_error!(InternalError, "Internal notifier error"))?;

match 1 {
1 => Ok(()),
_ => Err(s3_error!(InternalError, "Internal notifier error")),
}
Ok::<_, anyhow::Error>(())
}
.instrument(tracing::info_span!("query_data")),
);
Expand All @@ -1169,13 +1182,16 @@ impl S3 for ArunaS3Service {

let output = GetObjectOutput {
body,
content_length: calc_content_len,
accept_ranges,
content_range,
content_length,
last_modified: None,
e_tag: Some(format!("-{}", id)),
version_id: None,
..Default::default()
};
debug!(?output);

Ok(S3Response::new(output))
}

Expand Down
35 changes: 24 additions & 11 deletions src/s3_frontend/utils/ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,36 @@ pub fn calculate_ranges(
input_range: Option<S3Range>,
content_length: u64,
footer: Option<FooterParser>,
) -> Result<(Option<String>, Option<ArunaRange>)> {
) -> Result<(Option<String>, Option<ArunaRange>, u64)> {
match input_range {
Some(r) => match footer {
Some(mut foot) => {
trace_err!(foot.parse())?;
let (o1, mut o2) = trace_err!(
foot.get_offsets_by_range(aruna_range_from_s3range(r, content_length))
)?;
o2.to += 1;
Ok((Some(format!("bytes={}-{}", o1.from, o1.to - 1)), Some(o2)))
// Convert input range to internal range
let aruna_range = aruna_range_from_s3range(r, content_length);
// Calculate block offsets
let (query_range, mut filter_range) =
trace_err!(foot.get_offsets_by_range(aruna_range))?;
filter_range.to += 1;

Ok((
Some(format!("bytes={}-{}", query_range.from, query_range.to - 1)),
Some(filter_range),
aruna_range.from,
))
}
None => {
let mut ar_range = aruna_range_from_s3range(r, content_length);
ar_range.to += 1;
// Convert input range to internal range
let mut aruna_range = aruna_range_from_s3range(r, content_length);
aruna_range.to += 1;
Ok((
Some(format!("bytes={}-{}", ar_range.from, ar_range.to)),
Some(format!("bytes={}-{}", aruna_range.from, aruna_range.to)),
None,
aruna_range.from,
))
}
},
None => Ok((None, None)),
None => Ok((None, None, 0)),
}
}

Expand All @@ -45,7 +54,11 @@ pub fn aruna_range_from_s3range(range_string: S3Range, content_length: u64) -> A
Int { first, last } => match last {
Some(val) => ArunaRange {
from: first,
to: val,
to: if val > content_length {
content_length - 1
} else {
val
},
},
None => ArunaRange {
from: first,
Expand Down

0 comments on commit 3ad94d5

Please sign in to comment.