Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Sep 24, 2024
1 parent eba4135 commit 3109378
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 33 deletions.
3 changes: 2 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
let batches: Vec<RecordBatch> = scan
.execute(engine)?
.map(|res| {
let data = res.unwrap().raw_data.unwrap();
let res = res.unwrap();
let data = res.raw_data.unwrap();
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
Expand Down
6 changes: 1 addition & 5 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,7 @@ mod tests {
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let files = scan
.execute(&engine)
.unwrap()
.map(Result::unwrap)
.collect_vec();
let files: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();

assert_eq!(files.len(), 1);
let num_rows = files[0].raw_data.as_ref().unwrap().length();
Expand Down
3 changes: 2 additions & 1 deletion kernel/tests/dv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?.map(Result::unwrap);
let stream = scan.execute(&engine)?;
let mut total_rows = 0;
for res in stream {
let res = res?;
let data = res.raw_data?;
let rows = data.length();
for i in 0..rows {
Expand Down
6 changes: 3 additions & 3 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow::{compute::filter_record_batch, record_batch::RecordBatch};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::Schema;
use arrow_select::{concat::concat_batches, take::take};
use itertools::Itertools;
use paste::paste;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -158,8 +159,7 @@ async fn latest_snapshot_test(
let scan = snapshot.into_scan_builder().build().unwrap();
let scan_res = scan.execute(&engine).unwrap();
let batches: Vec<RecordBatch> = scan_res
.map(Result::unwrap)
.map(|sr| {
.map_ok(|sr| {
let data = sr.raw_data.unwrap();
let record_batch = to_arrow(data).unwrap();
if let Some(mut mask) = sr.mask {
Expand All @@ -173,7 +173,7 @@ async fn latest_snapshot_test(
record_batch
}
})
.collect();
.try_collect()?;

let expected = read_expected(&expected_path.expect("expect an expected dir"))
.await
Expand Down
34 changes: 11 additions & 23 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::schema::Schema;
use delta_kernel::{Engine, EngineData, FileMeta, Table};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -120,13 +121,10 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan
.execute(&engine)?
.map(Result::unwrap)
.zip(expected_data);
let stream = scan.execute(&engine)?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data.raw_data?;
let raw_data = data?.raw_data?;
files += 1;
assert_eq!(into_record_batch(raw_data), expected);
}
Expand Down Expand Up @@ -174,13 +172,10 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan
.execute(&engine)?
.map(Result::unwrap)
.zip(expected_data);
let stream = scan.execute(&engine)?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data.raw_data?;
let raw_data = data?.raw_data?;
files += 1;
assert_eq!(into_record_batch(raw_data), expected);
}
Expand Down Expand Up @@ -231,14 +226,11 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan
.execute(&engine)?
.map(Result::unwrap)
.zip(expected_data);
let stream = scan.execute(&engine)?.zip(expected_data);

let mut files = 0;
for (data, expected) in stream {
let raw_data = data.raw_data?;
let raw_data = data?.raw_data?;
files += 1;
assert_eq!(into_record_batch(raw_data), expected);
}
Expand Down Expand Up @@ -360,13 +352,10 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {

let expected_files = expected_batches.len();
let mut files_scanned = 0;
let stream = scan
.execute(&engine)?
.map(Result::unwrap)
.zip(expected_batches);
let stream = scan.execute(&engine)?.zip(expected_batches);

for (batch, expected) in stream {
let raw_data = batch.raw_data?;
let raw_data = batch?.raw_data?;
files_scanned += 1;
assert_eq!(into_record_batch(raw_data), expected.clone());
}
Expand Down Expand Up @@ -410,8 +399,7 @@ fn read_with_execute(
let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?);
let scan_results = scan.execute(engine)?;
let batches: Vec<RecordBatch> = scan_results
.map(Result::unwrap)
.map(|sr| {
.map_ok(|sr| {
let data = sr.raw_data.unwrap();
let record_batch = to_arrow(data).unwrap();
if let Some(mut mask) = sr.mask {
Expand All @@ -425,7 +413,7 @@ fn read_with_execute(
record_batch
}
})
.collect();
.try_collect()?;

if expected.is_empty() {
assert_eq!(batches.len(), 0);
Expand Down

0 comments on commit 3109378

Please sign in to comment.