Skip to content

Commit 1f378d0

Browse files
committed
feat: robustify pre-signed URL checks
1 parent 2573424 commit 1f378d0

File tree

3 files changed

+34
-24
lines changed

3 files changed

+34
-24
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use object_store::path::Path;
77
use object_store::{DynObjectStore, ObjectStore};
88
use url::Url;
99

10+
use super::UrlExt;
1011
use crate::engine::default::executor::TaskExecutor;
1112
use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
1213

@@ -131,18 +132,15 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
131132
};
132133
let store = store.clone();
133134
async move {
134-
match url.scheme() {
135-
"http" | "https" => {
136-
// have to annotate type here or rustc can't figure it out
137-
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
138-
}
139-
_ => {
140-
if let Some(rng) = range {
141-
Ok(store.get_range(&path, rng).await?)
142-
} else {
143-
let result = store.get(&path).await?;
144-
Ok(result.bytes().await?)
145-
}
135+
if url.is_presigned() {
136+
// have to annotate type here or rustc can't figure it out
137+
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
138+
} else {
139+
if let Some(rng) = range {
140+
Ok(store.get_range(&path, rng).await?)
141+
} else {
142+
let result = store.get(&path).await?;
143+
Ok(result.bytes().await?)
146144
}
147145
}
148146
}

kernel/src/engine/default/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,16 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
158158
}
159159
}
160160

161+
pub(self) trait UrlExt {
162+
fn is_presigned(&self) -> bool;
163+
}
164+
165+
impl UrlExt for Url {
166+
fn is_presigned(&self) -> bool {
167+
matches!(self.scheme(), "http" | "https") && self.query().is_some()
168+
}
169+
}
170+
161171
#[cfg(test)]
162172
mod tests {
163173
use super::executor::tokio::TokioBackgroundExecutor;

kernel/src/engine/default/parquet.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,24 @@ use std::collections::HashMap;
44
use std::ops::Range;
55
use std::sync::Arc;
66

7-
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
8-
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray};
9-
use crate::parquet::arrow::arrow_reader::{
10-
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
11-
};
12-
use crate::parquet::arrow::arrow_writer::ArrowWriter;
13-
use crate::parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
147
use futures::StreamExt;
158
use object_store::path::Path;
169
use object_store::DynObjectStore;
1710
use uuid::Uuid;
1811

1912
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
13+
use super::UrlExt;
14+
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
15+
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray};
2016
use crate::engine::arrow_data::ArrowEngineData;
2117
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
2218
use crate::engine::default::executor::TaskExecutor;
2319
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
20+
use crate::parquet::arrow::arrow_reader::{
21+
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
22+
};
23+
use crate::parquet::arrow::arrow_writer::ArrowWriter;
24+
use crate::parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
2425
use crate::schema::SchemaRef;
2526
use crate::{
2627
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
@@ -191,18 +192,19 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
191192
// -> reqwest to get data
192193
// -> parse to parquet
193194
// SAFETY: we did is_empty check above, this is ok.
194-
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
195-
"http" | "https" => Box::new(PresignedUrlOpener::new(
195+
let file_opener: Box<dyn FileOpener> = if files[0].location.is_presigned() {
196+
Box::new(PresignedUrlOpener::new(
196197
1024,
197198
physical_schema.clone(),
198199
predicate,
199-
)),
200-
_ => Box::new(ParquetOpener::new(
200+
))
201+
} else {
202+
Box::new(ParquetOpener::new(
201203
1024,
202204
physical_schema.clone(),
203205
predicate,
204206
self.store.clone(),
205-
)),
207+
))
206208
};
207209
FileStream::new_async_read_iterator(
208210
self.task_executor.clone(),

0 commit comments

Comments
 (0)