Skip to content

Commit

Permalink
Support range in GetOptions (#40)
Browse files Browse the repository at this point in the history
* Support range in GetOptions

* Add test for get with options
  • Loading branch information
kylebarron authored Oct 23, 2024
1 parent b581ce4 commit 6b6f666
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 10 deletions.
44 changes: 39 additions & 5 deletions object-store-rs/python/object_store_rs/_get.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
from datetime import datetime
from typing import List, Sequence, TypedDict
from typing import List, Sequence, Tuple, TypedDict

from ._list import ObjectMeta
from .store import ObjectStore
Expand All @@ -10,8 +10,11 @@ if sys.version_info >= (3, 12):
else:
from typing_extensions import Buffer as _Buffer

class GetOptions(TypedDict):
"""Options for a get request, such as range"""
class GetOptions(TypedDict, total=False):
"""Options for a get request, such as range.
All options are optional.
"""

if_match: str | None
"""
Expand Down Expand Up @@ -63,11 +66,32 @@ class GetOptions(TypedDict):
<https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
"""

# range:
range: Tuple[int | None, int | None]
"""
Request transfer of only the specified range of bytes
otherwise returning [`Error::NotModified`]
The semantics of this tuple are:
- `(int, int)`: Request a specific range of bytes `(start, end)`.
If the given range is zero-length or starts after the end of the object, an
error will be returned. Additionally, if the range ends after the end of the
object, the entire remainder of the object will be returned. Otherwise, the
exact requested range will be returned.
The `end` offset is _exclusive_.
- `(int, None)`: Request all bytes starting from a given byte offset.
This is equivalent to `bytes={int}-` as an HTTP header.
- `(None, int)`: Request the last `int` bytes. Note that here, `int` is _this size
of the request_, not the byte offset. This is equivalent to `bytes=-{int}` as an
HTTP header.
<https://datatracker.ietf.org/doc/html/rfc9110#name-range>
"""

Expand Down Expand Up @@ -125,7 +149,17 @@ class GetResult:

@property
def meta(self) -> ObjectMeta:
"""The ObjectMeta for this object"""
"""The ObjectMeta for this object.
This must be accessed _before_ calling `stream`, `bytes`, or `bytes_async`.
"""

@property
def range(self) -> ObjectMeta:
"""The range of bytes returned by this request.
This must be accessed _before_ calling `stream`, `bytes`, or `bytes_async`.
"""

def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BytesStream:
"""Return a chunked stream over the result's bytes.
Expand Down
68 changes: 63 additions & 5 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::buffer::Buffer;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use object_store::{GetOptions, GetResult, ObjectStore};
use object_store::{GetOptions, GetRange, GetResult, ObjectStore};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
Expand All @@ -20,32 +21,80 @@ use crate::runtime::get_runtime;
/// 10MB default chunk size
const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024;

#[derive(FromPyObject)]
pub(crate) struct PyGetOptions {
if_match: Option<String>,
if_none_match: Option<String>,
if_modified_since: Option<DateTime<Utc>>,
if_unmodified_since: Option<DateTime<Utc>>,
// TODO:
// range: Option<Range<usize>>,
range: Option<PyGetRange>,
version: Option<String>,
head: bool,
}

impl<'py> FromPyObject<'py> for PyGetOptions {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let dict = ob.extract::<HashMap<String, Bound<PyAny>>>()?;
Ok(Self {
if_match: dict.get("if_match").map(|x| x.extract()).transpose()?,
if_none_match: dict.get("if_none_match").map(|x| x.extract()).transpose()?,
if_modified_since: dict
.get("if_modified_since")
.map(|x| x.extract())
.transpose()?,
if_unmodified_since: dict
.get("if_unmodified_since")
.map(|x| x.extract())
.transpose()?,
range: dict.get("range").map(|x| x.extract()).transpose()?,
version: dict.get("version").map(|x| x.extract()).transpose()?,
head: dict
.get("head")
.map(|x| x.extract())
.transpose()?
.unwrap_or(false),
})
}
}

impl From<PyGetOptions> for GetOptions {
fn from(value: PyGetOptions) -> Self {
Self {
if_match: value.if_match,
if_none_match: value.if_none_match,
if_modified_since: value.if_modified_since,
if_unmodified_since: value.if_unmodified_since,
range: None,
range: value.range.map(|x| x.0),
version: value.version,
head: value.head,
}
}
}

pub(crate) struct PyGetRange(GetRange);

impl<'py> FromPyObject<'py> for PyGetRange {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let range = ob.extract::<[Option<usize>; 2]>()?;
match (range[0], range[1]) {
(Some(start), Some(end)) => {
if start >= end {
return Err(PyValueError::new_err(
format!("End range must be strictly greater than start range. Got start: {}, end: {}", start, end ),
));
}

Ok(Self(GetRange::Bounded(start..end)))
}
(Some(start), None) => Ok(Self(GetRange::Offset(start))),
// Note: in this case `end` means `suffix bytes`
(None, Some(end)) => Ok(Self(GetRange::Suffix(end))),
(None, None) => Err(PyValueError::new_err(
"Cannot provide (None, None) for range.",
)),
}
}
}

#[pyclass(name = "GetResult")]
pub(crate) struct PyGetResult(Option<GetResult>);

Expand Down Expand Up @@ -92,6 +141,15 @@ impl PyGetResult {
Ok(PyObjectMeta::new(inner.meta.clone()))
}

#[getter]
fn range(&self) -> PyResult<(usize, usize)> {
let inner = self
.0
.as_ref()
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
Ok((inner.range.start, inner.range.end))
}

#[pyo3(signature = (min_chunk_size = DEFAULT_BYTES_CHUNK_SIZE))]
fn stream(&mut self, min_chunk_size: usize) -> PyResult<PyBytesStream> {
let get_result = self
Expand Down
14 changes: 14 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ async def test_stream_async():
assert pos == len(data)


def test_get_with_options():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

obs.put(store, path, data)

result = obs.get(store, path, options={"range": (5, 10)})
assert result.range == (5, 10)
buf = result.bytes()
assert buf == data[5:10]


def test_get_range():
store = MemoryStore()

Expand Down

0 comments on commit 6b6f666

Please sign in to comment.