From 6b6f666d0c79ded5cfc33c5e5fd41df6ea7266ef Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 23 Oct 2024 17:19:34 -0400 Subject: [PATCH] Support range in GetOptions (#40) * Support range in GetOptions * Add test for get with options --- .../python/object_store_rs/_get.pyi | 44 ++++++++++-- object-store-rs/src/get.rs | 68 +++++++++++++++++-- tests/test_get.py | 14 ++++ 3 files changed, 116 insertions(+), 10 deletions(-) diff --git a/object-store-rs/python/object_store_rs/_get.pyi b/object-store-rs/python/object_store_rs/_get.pyi index 5152114..7f615f6 100644 --- a/object-store-rs/python/object_store_rs/_get.pyi +++ b/object-store-rs/python/object_store_rs/_get.pyi @@ -1,6 +1,6 @@ import sys from datetime import datetime -from typing import List, Sequence, TypedDict +from typing import List, Sequence, Tuple, TypedDict from ._list import ObjectMeta from .store import ObjectStore @@ -10,8 +10,11 @@ if sys.version_info >= (3, 12): else: from typing_extensions import Buffer as _Buffer -class GetOptions(TypedDict): - """Options for a get request, such as range""" +class GetOptions(TypedDict, total=False): + """Options for a get request, such as range. + + All options are optional. + """ if_match: str | None """ @@ -63,11 +66,32 @@ class GetOptions(TypedDict): """ - # range: + range: Tuple[int | None, int | None] """ Request transfer of only the specified range of bytes otherwise returning [`Error::NotModified`] + The semantics of this tuple are: + + - `(int, int)`: Request a specific range of bytes `(start, end)`. + + If the given range is zero-length or starts after the end of the object, an + error will be returned. Additionally, if the range ends after the end of the + object, the entire remainder of the object will be returned. Otherwise, the + exact requested range will be returned. + + The `end` offset is _exclusive_. + + - `(int, None)`: Request all bytes starting from a given byte offset. + + This is equivalent to `bytes={int}-` as an HTTP header. + + - `(None, int)`: Request the last `int` bytes. Note that here, `int` is _this size + of the request_, not the byte offset. This is equivalent to `bytes=-{int}` as an + HTTP header. + + + """ @@ -125,7 +149,17 @@ class GetResult: @property def meta(self) -> ObjectMeta: - """The ObjectMeta for this object""" + """The ObjectMeta for this object. + + This must be accessed _before_ calling `stream`, `bytes`, or `bytes_async`. + """ + + @property + def range(self) -> ObjectMeta: + """The range of bytes returned by this request. + + This must be accessed _before_ calling `stream`, `bytes`, or `bytes_async`. + """ def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BytesStream: """Return a chunked stream over the result's bytes. diff --git a/object-store-rs/src/get.rs b/object-store-rs/src/get.rs index 5b161d1..3d198fc 100644 --- a/object-store-rs/src/get.rs +++ b/object-store-rs/src/get.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use arrow::buffer::Buffer; @@ -5,7 +6,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::stream::{BoxStream, Fuse}; use futures::StreamExt; -use object_store::{GetOptions, GetResult, ObjectStore}; +use object_store::{GetOptions, GetRange, GetResult, ObjectStore}; use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyBytes; @@ -20,18 +21,41 @@ use crate::runtime::get_runtime; /// 10MB default chunk size const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024; -#[derive(FromPyObject)] pub(crate) struct PyGetOptions { if_match: Option, if_none_match: Option, if_modified_since: Option>, if_unmodified_since: Option>, - // TODO: - // range: Option>, + range: Option, version: Option, head: bool, } +impl<'py> FromPyObject<'py> for PyGetOptions { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let dict = ob.extract::>>()?; + Ok(Self { + if_match: dict.get("if_match").map(|x| x.extract()).transpose()?, + if_none_match: dict.get("if_none_match").map(|x| x.extract()).transpose()?, + if_modified_since: dict + .get("if_modified_since") + .map(|x| x.extract()) + .transpose()?, + if_unmodified_since: dict + .get("if_unmodified_since") + .map(|x| x.extract()) + .transpose()?, + range: dict.get("range").map(|x| x.extract()).transpose()?, + version: dict.get("version").map(|x| x.extract()).transpose()?, + head: dict + .get("head") + .map(|x| x.extract()) + .transpose()? + .unwrap_or(false), + }) + } +} + impl From for GetOptions { fn from(value: PyGetOptions) -> Self { Self { @@ -39,13 +63,38 @@ impl From for GetOptions { if_none_match: value.if_none_match, if_modified_since: value.if_modified_since, if_unmodified_since: value.if_unmodified_since, - range: None, + range: value.range.map(|x| x.0), version: value.version, head: value.head, } } } +pub(crate) struct PyGetRange(GetRange); + +impl<'py> FromPyObject<'py> for PyGetRange { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let range = ob.extract::<[Option; 2]>()?; + match (range[0], range[1]) { + (Some(start), Some(end)) => { + if start >= end { + return Err(PyValueError::new_err( + format!("End range must be strictly greater than start range. Got start: {}, end: {}", start, end ), + )); + } + + Ok(Self(GetRange::Bounded(start..end))) + } + (Some(start), None) => Ok(Self(GetRange::Offset(start))), + // Note: in this case `end` means `suffix bytes` + (None, Some(end)) => Ok(Self(GetRange::Suffix(end))), + (None, None) => Err(PyValueError::new_err( + "Cannot provide (None, None) for range.", + )), + } + } +} + #[pyclass(name = "GetResult")] pub(crate) struct PyGetResult(Option); @@ -92,6 +141,15 @@ impl PyGetResult { Ok(PyObjectMeta::new(inner.meta.clone())) } + #[getter] + fn range(&self) -> PyResult<(usize, usize)> { + let inner = self + .0 + .as_ref() + .ok_or(PyValueError::new_err("Result has already been disposed."))?; + Ok((inner.range.start, inner.range.end)) + } + #[pyo3(signature = (min_chunk_size = DEFAULT_BYTES_CHUNK_SIZE))] fn stream(&mut self, min_chunk_size: usize) -> PyResult { let get_result = self diff --git a/tests/test_get.py b/tests/test_get.py index 1ebd620..d8e8b4b 100644 --- a/tests/test_get.py +++ b/tests/test_get.py @@ -46,6 +46,20 @@ async def test_stream_async(): assert pos == len(data) +def test_get_with_options(): + store = MemoryStore() + + data = b"the quick brown fox jumps over the lazy dog," * 100 + path = "big-data.txt" + + obs.put(store, path, data) + + result = obs.get(store, path, options={"range": (5, 10)}) + assert result.range == (5, 10) + buf = result.bytes() + assert buf == data[5:10] + + def test_get_range(): store = MemoryStore()