Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use object_store::path::Path;
use object_store::{DynObjectStore, ObjectStore};
use url::Url;

use super::UrlExt;
use crate::engine::default::executor::TaskExecutor;
use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};

Expand Down Expand Up @@ -131,19 +132,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
};
let store = store.clone();
async move {
match url.scheme() {
"http" | "https" => {
// have to annotate type here or rustc can't figure it out
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
}
_ => {
if let Some(rng) = range {
Ok(store.get_range(&path, rng).await?)
} else {
let result = store.get(&path).await?;
Ok(result.bytes().await?)
}
}
if url.is_presigned() {
// have to annotate type here or rustc can't figure it out
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
} else if let Some(rng) = range {
Ok(store.get_range(&path, rng).await?)
} else {
let result = store.get(&path).await?;
Ok(result.bytes().await?)
}
}
})
Expand Down
47 changes: 47 additions & 0 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,35 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
}
}

trait UrlExt {
// Check if a given url is a presigned url and can be used
// to access the object store via simple http requests
fn is_presigned(&self) -> bool;
}

impl UrlExt for Url {
fn is_presigned(&self) -> bool {
matches!(self.scheme(), "http" | "https")
&& (
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
// https://developers.cloudflare.com/r2/api/s3/presigned-urls/
self
.query_pairs()
.any(|(k, _)| k.eq_ignore_ascii_case("X-Amz-Signature")) ||
// https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
// note signed permission (sp) must always be present
self
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("sp")) ||
// <https://cloud.google.com/storage/docs/authentication/signatures
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stray angled bracket <

self
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("X-Goog-Credential")) ||
// https://www.alibabacloud.com/help/en/oss/user-guide/upload-files-using-presigned-urls
self
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("X-OSS-Credential"))
)
}
}

#[cfg(test)]
mod tests {
use super::executor::tokio::TokioBackgroundExecutor;
Expand All @@ -173,4 +202,22 @@ mod tests {
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
test_arrow_engine(&engine, &url);
}

#[test]
fn test_pre_signed_url() {
Copy link
Collaborator

@sebastiantia sebastiantia Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: unit test for case insensitive comparison

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added :)

let url = Url::parse("https://example.com?X-Amz-Signature=foo").unwrap();
assert!(url.is_presigned());

let url = Url::parse("https://example.com?sp=foo").unwrap();
assert!(url.is_presigned());

let url = Url::parse("https://example.com?X-Goog-Credential=foo").unwrap();
assert!(url.is_presigned());

let url = Url::parse("https://example.com?X-OSS-Credential=foo").unwrap();
assert!(url.is_presigned());

let url = Url::parse("https://example.com").unwrap();
assert!(!url.is_presigned());
}
}
12 changes: 7 additions & 5 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use object_store::DynObjectStore;
use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::UrlExt;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
use crate::engine::default::executor::TaskExecutor;
Expand Down Expand Up @@ -191,18 +192,19 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> reqwest to get data
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(
let file_opener: Box<dyn FileOpener> = if files[0].location.is_presigned() {
Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
))
} else {
Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
))
};
FileStream::new_async_read_iterator(
self.task_executor.clone(),
Expand Down
Loading