-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Implement an AsyncReader for avro using ObjectStore #8930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
jecsand838
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing a partial review with some high level thoughts.
I'll wait for you to finish before resuming.
| /// 5. If no range was originally provided, reads the full file. | ||
| /// 6. If the range is 0, file_size is 0, or `range.end` is less than the header length, finish immediately. | ||
| pub struct AsyncAvroReader { | ||
| store: Arc<dyn object_store::ObjectStore>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the biggest high-level concern I have is the object_store hardwiring. My gut tells me we'd be better off with a generic AsyncFileReader<T: AsyncRead + AsyncSeek> or similar trait as the primary abstraction, with object_store as one feature flagged adapter imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was wondering how to implement this.
Perhaps just let the user(e.g. Datafusion) provide the impl and be completely agnostic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very fair. I think there's incredible value for an AsyncFileReader in arrow-avro, especially if implemented in a generic manner which is highly re-usable across downstream projects. Also object_store makes sense as a first class adapter imo.
My original intention was providing the building blocks for projects such as DataFusion to use for more concrete domain specific implementations.
I'd recommend looking into the parquet crate for inspiration. It uses an abstraction and provides a ParquetObjectReader
Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile |
100% I'm working on that right now and won't stop until I have a PR. That was a solid catch. The schema logic is an area of the code I mean to (or would welcome) a full refactor of. I knew it would eventually come back. |
| pub async fn try_new( | ||
| store: Arc<dyn object_store::ObjectStore>, | ||
| location: Path, | ||
| range: Option<Range<u64>>, | ||
| file_size: u64, | ||
| reader_schema: Option<AvroSchema>, | ||
| batch_size: usize, | ||
| ) -> Result<Self, ArrowError> { | ||
| let file_size = if file_size == 0 { | ||
| store | ||
| .head(&location) | ||
| .await | ||
| .map_err(|err| { | ||
| ArrowError::AvroError(format!("HEAD request failed for file, {err}")) | ||
| })? | ||
| .size | ||
| } else { | ||
| file_size | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'd probably consider using either a builder pattern or define a AsyncAvroReaderOptions struct for these params.
|
Sorry, I haven't dropped it, just found myself in a really busy week! The generic reader support does not seem to hard to implement from the dabbling I made, and I still need to get to the builder pattern change |
…, separate object store file reader into a featuregated struct and use a generic async file reader trait
|
@jecsand838 I believe this is now ready for a proper review^ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EmilyMatt Thank you so much for getting these changes up!
I left a few comments. Let me know what you think.
EDIT: Should have mentioned that this is looking really good overall and I'm very excited for the AsyncReader!
| # Enable async APIs | ||
| async = ["futures"] | ||
| # Enable object_store integration | ||
| object_store = ["dep:object_store", "async"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend updating the README.md and docs with details on these new features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant as I am notoriously bad at writing docs 😅
Will use Claude to try and make something
| /// A broad generic trait definition allowing fetching bytes from any source asynchronously. | ||
| /// This trait has very few limitations, mostly in regard to ownership and lifetime, | ||
| /// but it must return a boxed Future containing [`bytes::Bytes`] or an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to provide examples on how to use this for the docs.
| pub(super) reader_schema: Option<AvroSchema>, | ||
| } | ||
|
|
||
| impl<R: AsyncFileReader> AsyncAvroReaderBuilder<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but I'd consider naming this either AsyncFileReaderBuilder or AsyncOcfReaderBuilder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename to the first, tbh I'm not sure if and how we intend to handle the other types yet.
| None => { | ||
| let devised_avro_schema = AvroSchema::try_from(self.schema.as_ref())?; | ||
| let devised_reader_schema = devised_avro_schema.schema()?; | ||
| field_builder | ||
| .with_reader_schema(&devised_reader_schema) | ||
| .build() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just execute field_builder.build without a reader_schema in this case?
The Reader treats this scenario as one where the caller simply wants to decode an OCF file without schema resolution, purely using the writer_schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm gonna remove this completely right now, since I don't really trust the conversion as well
| pub fn builder( | ||
| reader: R, | ||
| file_size: u64, | ||
| schema: SchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't think the schema field should be required here as it subtracts from Avro's self-describing characteristic while effectively making the optional reader_schema required.
I'd recommend setting this up in a manner that encourages callers to use with_reader_schema. That way callers which simply want to read an OCF file without schema resolution are optimally supported.
If we absolutely need to support passing in an Arrow reader_schema, then I'd recommend adding an optional (and well documented) with_arrow_reader_schema method (to compliment with_reader_schema) that inputs an Arrow SchemaRef and runs AvroSchema::try_from on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm removing this completely, users should use reader schema directly if they so choose
| /// 4. If a block is incomplete (due to range ending mid-block), fetching the remaining bytes from the [`AsyncFileReader`]. | ||
| /// 5. If no range was originally provided, reads the full file. | ||
| /// 6. If the range is 0, file_size is 0, or `range.end` is less than the header length, finish immediately. | ||
| pub struct AsyncAvroReader<R: AsyncFileReader> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: but I'd also consider calling this AsyncFileReader or AsyncOcfReader.
| pub trait AsyncFileReader: Send + Unpin { | ||
| /// Fetch a range of bytes asynchronously using a custom reading method | ||
| fn fetch_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed; | ||
|
|
||
| /// Fetch a range that is beyond the originally provided file range, | ||
| /// such as reading the header before reading the file, | ||
| /// or fetching the remainder of the block in case the range ended before the block's end. | ||
| /// By default, this will simply point to the fetch_range function. | ||
| fn fetch_extra_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed { | ||
| self.fetch_range(range) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's your take on aligning this a bit more with the trait used in parquet and arrow/async_reader?
| pub trait AsyncFileReader: Send + Unpin { | |
| /// Fetch a range of bytes asynchronously using a custom reading method | |
| fn fetch_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed; | |
| /// Fetch a range that is beyond the originally provided file range, | |
| /// such as reading the header before reading the file, | |
| /// or fetching the remainder of the block in case the range ended before the block's end. | |
| /// By default, this will simply point to the fetch_range function. | |
| fn fetch_extra_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed { | |
| self.fetch_range(range) | |
| } | |
| } | |
| pub trait AsyncFileReader: Send { | |
| /// Retrieve the bytes in `range` | |
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>; | |
| } |
My thinking is this:
- The
get_bytestrait method is just "fetch these bytes". It doesn't know or care whether the range is within some "expected" range. The out-of-band reads (header, partial block completion) could be a concern of the reader logic, not the I/O trait. - Users already understand
get_bytes/get_byte_ranges. Reusing that mental model reduces friction. Plus consistency across crates is generally a best practice. - This would unlock a clean default
impl for AsyncRead + AsyncSeek(liketokio::fs::File) the same way Parquet does . The current'staticrequirement forces all implementations to be fully owned orArc-wrapped, which seems unnecessarily rigid for simple file readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Indeed, but I'd like something like fetching the metadata/header in this case to be separate, parquet does this as well.
-
I agree with that, I will consider how to best approach this^^
-
while it's true that the trait does not need this restriction, it is necessary in order to write the actual code, the parquet reader also has
<T: AsyncFileReader + Send + 'static>
Otherwise you simply could not use the underlying async readers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only that, looking deeper it seems it also uses
impl ParquetRecordBatchStream
where
T: AsyncFileReader + Unpin + Send + 'static,
| let mut decoder = HeaderDecoder::default(); | ||
| let mut position = 0; | ||
| loop { | ||
| let range_to_fetch = position..(position + 64 * 1024).min(self.file_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for hardcoding position + 64 * 1024?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, my usual files have a smaller header but I figured this is a small enough value to be inconsequential for fetches and will almost certainly mean we don't have to run the loop more than once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an optional hint
|
|
||
| // Should clamp to file size | ||
| assert_eq!(batch.num_rows(), 8); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may also be worth adding round-trip tests using the Writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added something quite minimal, but I don't know how much value it adds
| /// An implementation of an AsyncFileReader using the [`object_store::ObjectStore`] API. | ||
| pub struct ObjectStoreFileReader { | ||
| store: Arc<dyn object_store::ObjectStore>, | ||
| location: object_store::path::Path, | ||
| } | ||
|
|
||
| impl ObjectStoreFileReader { | ||
| /// Creates a new [`Self`] from a store implementation and file location. | ||
| pub fn new( | ||
| store: Arc<dyn object_store::ObjectStore>, | ||
| location: object_store::path::Path, | ||
| ) -> Self { | ||
| Self { store, location } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing that occurred to me is we could support different runtimes for reading from ObjectStore and Avro decoding by following the pattern below from the ParquetObjectReader.
/// Perform IO on the provided tokio runtime
///
/// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
/// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
/// on the same tokio runtime can lead to degraded throughput, dropped connections and
/// other issues. For more information see [here].
///
/// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
pub fn with_runtime(self, handle: Handle) -> Self {
Self {
runtime: Some(handle),
..self
}
}
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
+ Send
+ 'static,
O: Send + 'static,
E: Into<ParquetError> + Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.path.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
.map_ok_or_else(
|e| match e.try_into_panic() {
Err(e) => Err(ParquetError::External(Box::new(e))),
Ok(p) => std::panic::resume_unwind(p),
},
|res| res.map_err(|e| e.into()),
)
.boxed()
}
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
}
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, yeah looks good, I will take another look at the parquet impl
|
@jecsand838 and @EmilyMatt -- how is this PR looking? |
I had actually just returned to work on it 2 days ago, still having some issues with the schema now being provided, due to the problems I've described, @jecsand838 suggested removing the arrow schema and I'm starting to think that is the only viable way for now. |
|
Hope to push another version today and address some of the things above |
|
@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions. |
Which issue does this PR close?
Rationale for this change
Allows for proper file splitting within an asynchronous context.
What changes are included in this PR?
The raw implementation, allowing for file splitting, starting mid-block(read until sync marker is found), and further reading until end of block is found.
This reader currently requires a reader_schema is provided if type-promotion, schema-evolution, or projection are desired.
This is done so because #8928 is currently blocking proper parsing from an ArrowSchema
Are these changes tested?
Yes
Are there any user-facing changes?
Only the addition.
Other changes are internal to the crate (namely the way Decoder is created from parts)