Skip to content

Commit 512a31a

Browse files
authored
feat: robustify pre-signed URL checks (#760)
## What changes are proposed in this pull request? Current checks for a pre-signed url in our readers may mis-classify object store URLs that do not use store specific schemes (`s3://`m `az://` ...) as pre-signed urls. We extend the current checks, to also consider the presence of a query string and define a helper trait to ensure we can maintain the check in one place. ## How was this change tested? additional test to validate url matches.
1 parent 79b9f24 commit 512a31a

File tree

3 files changed

+70
-18
lines changed

3 files changed

+70
-18
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use object_store::path::Path;
77
use object_store::{DynObjectStore, ObjectStore};
88
use url::Url;
99

10+
use super::UrlExt;
1011
use crate::engine::default::executor::TaskExecutor;
1112
use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
1213

@@ -131,19 +132,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
131132
};
132133
let store = store.clone();
133134
async move {
134-
match url.scheme() {
135-
"http" | "https" => {
136-
// have to annotate type here or rustc can't figure it out
137-
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
138-
}
139-
_ => {
140-
if let Some(rng) = range {
141-
Ok(store.get_range(&path, rng).await?)
142-
} else {
143-
let result = store.get(&path).await?;
144-
Ok(result.bytes().await?)
145-
}
146-
}
135+
if url.is_presigned() {
136+
// have to annotate type here or rustc can't figure it out
137+
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
138+
} else if let Some(rng) = range {
139+
Ok(store.get_range(&path, rng).await?)
140+
} else {
141+
let result = store.get(&path).await?;
142+
Ok(result.bytes().await?)
147143
}
148144
}
149145
})

kernel/src/engine/default/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,35 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
158158
}
159159
}
160160

161+
trait UrlExt {
162+
// Check if a given url is a presigned url and can be used
163+
// to access the object store via simple http requests
164+
fn is_presigned(&self) -> bool;
165+
}
166+
167+
impl UrlExt for Url {
168+
fn is_presigned(&self) -> bool {
169+
matches!(self.scheme(), "http" | "https")
170+
&& (
171+
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
172+
// https://developers.cloudflare.com/r2/api/s3/presigned-urls/
173+
self
174+
.query_pairs()
175+
.any(|(k, _)| k.eq_ignore_ascii_case("X-Amz-Signature")) ||
176+
// https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
177+
// note signed permission (sp) must always be present
178+
self
179+
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("sp")) ||
180+
// https://cloud.google.com/storage/docs/authentication/signatures
181+
self
182+
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("X-Goog-Credential")) ||
183+
// https://www.alibabacloud.com/help/en/oss/user-guide/upload-files-using-presigned-urls
184+
self
185+
.query_pairs().any(|(k, _)| k.eq_ignore_ascii_case("X-OSS-Credential"))
186+
)
187+
}
188+
}
189+
161190
#[cfg(test)]
162191
mod tests {
163192
use super::executor::tokio::TokioBackgroundExecutor;
@@ -173,4 +202,29 @@ mod tests {
173202
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
174203
test_arrow_engine(&engine, &url);
175204
}
205+
206+
#[test]
207+
fn test_pre_signed_url() {
208+
let url = Url::parse("https://example.com?X-Amz-Signature=foo").unwrap();
209+
assert!(url.is_presigned());
210+
211+
let url = Url::parse("https://example.com?sp=foo").unwrap();
212+
assert!(url.is_presigned());
213+
214+
let url = Url::parse("https://example.com?X-Goog-Credential=foo").unwrap();
215+
assert!(url.is_presigned());
216+
217+
let url = Url::parse("https://example.com?X-OSS-Credential=foo").unwrap();
218+
assert!(url.is_presigned());
219+
220+
// assert that query keys are case insensitive
221+
let url = Url::parse("https://example.com?x-gooG-credenTIAL=foo").unwrap();
222+
assert!(url.is_presigned());
223+
224+
let url = Url::parse("https://example.com?x-oss-CREDENTIAL=foo").unwrap();
225+
assert!(url.is_presigned());
226+
227+
let url = Url::parse("https://example.com").unwrap();
228+
assert!(!url.is_presigned());
229+
}
176230
}

kernel/src/engine/default/parquet.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use object_store::DynObjectStore;
1717
use uuid::Uuid;
1818

1919
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
20+
use super::UrlExt;
2021
use crate::engine::arrow_data::ArrowEngineData;
2122
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
2223
use crate::engine::default::executor::TaskExecutor;
@@ -191,18 +192,19 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
191192
// -> reqwest to get data
192193
// -> parse to parquet
193194
// SAFETY: we did is_empty check above, this is ok.
194-
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
195-
"http" | "https" => Box::new(PresignedUrlOpener::new(
195+
let file_opener: Box<dyn FileOpener> = if files[0].location.is_presigned() {
196+
Box::new(PresignedUrlOpener::new(
196197
1024,
197198
physical_schema.clone(),
198199
predicate,
199-
)),
200-
_ => Box::new(ParquetOpener::new(
200+
))
201+
} else {
202+
Box::new(ParquetOpener::new(
201203
1024,
202204
physical_schema.clone(),
203205
predicate,
204206
self.store.clone(),
205-
)),
207+
))
206208
};
207209
FileStream::new_async_read_iterator(
208210
self.task_executor.clone(),

0 commit comments

Comments
 (0)