diff --git a/Cargo.lock b/Cargo.lock index 13062b6de..fbed01d3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3258,6 +3258,7 @@ dependencies = [ "typed-builder 0.20.1", "url", "uuid", + "waker-set", "zstd", ] @@ -7175,6 +7176,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-set" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e958152c46345e1af5c61812030ac85200573a0b384c137e83ce2c01ac4bc07" +dependencies = [ + "crossbeam-utils", + "slab", +] + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 185b5fc01..894a46018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,4 +116,5 @@ url = "2.5.4" uuid = { version = "1.14", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.6" +waker-set = "0.2.0" zstd = "0.13.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 8795edc74..9ad64505c 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -84,6 +84,7 @@ tokio = { workspace = true, optional = false, features = ["sync"] } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } +waker-set = { workspace = true } zstd = { workspace = true } [dev-dependencies] @@ -98,4 +99,4 @@ tera = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version -ignored = ["tap"] \ No newline at end of file +ignored = ["tap"] diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs new file mode 100644 index 000000000..e9753a918 --- /dev/null +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -0,0 +1,735 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::ops::Not; +use std::sync::Arc; + +use arrow_array::{ + Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, +}; +use futures::channel::oneshot; +use futures::future::join_all; +use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; +use tokio::sync::oneshot::{channel, Receiver}; + +use super::delete_filter::{DeleteFilter, EqDelFuture}; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use crate::arrow::{arrow_schema_to_schema, ArrowReader}; +use crate::delete_vector::DeleteVector; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Predicate, Reference}; +use crate::io::FileIO; +use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; +use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, Schema, SchemaRef}; +use crate::{Error, ErrorKind, Result}; + +#[allow(unused)] +pub trait DeleteFileLoader { + /// Read the delete file referred to in the task + /// + /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution. + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result; +} + +#[allow(unused)] +#[derive(Clone, Debug)] +pub(crate) struct CachingDeleteFileLoader { + file_io: FileIO, + concurrency_limit_data_files: usize, + del_filter: DeleteFilter, +} + +impl DeleteFileLoader for CachingDeleteFileLoader { + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result { + let raw_batch_stream = + CachingDeleteFileLoader::parquet_to_batch_stream(&task.file_path, self.file_io.clone()) + .await?; + + Self::evolve_schema(raw_batch_stream, schema).await + } +} + +// Intermediate context during processing of a delete file task. +enum DeleteFileContext { + // TODO: Delete Vector loader from Puffin files + InProgEqDel(EqDelFuture), + PosDels(ArrowRecordBatchStream), + FreshEqDel { + batch_stream: ArrowRecordBatchStream, + sender: oneshot::Sender, + equality_ids: HashSet, + }, +} + +// Final result of the processing of a delete file task before +// results are fully merged into the DeleteFileManager's state +enum ParsedDeleteFileContext { + InProgEqDel(EqDelFuture), + DelVecs(HashMap), + EqDel, +} + +#[allow(unused_variables)] +impl CachingDeleteFileLoader { + pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + CachingDeleteFileLoader { + file_io, + concurrency_limit_data_files, + del_filter: DeleteFilter::default(), + } + } + + /// Load the deletes for all the specified tasks + /// + /// Returned future completes once all loading has finished. + /// + /// * Create a single stream of all delete file tasks irrespective of type, + /// so that we can respect the combined concurrency limit + /// * We then process each in two phases: load and parse. + /// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to + /// stream the file contents out + /// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by + /// another concurrently processing data file scan task. If it is, we return a future + /// for the pre-existing task from the load phase. If not, we create such a future + /// and store it in the state to prevent other data file tasks from starting to load + /// the same equality delete file, and return a record batch stream from the load phase + /// as per the other delete file types - only this time it is accompanied by a one-shot + /// channel sender that we will eventually use to resolve the shared future that we stored + /// in the state. + /// * When this gets updated to add support for delete vectors, the load phase will return + /// a PuffinReader for them. + /// * The parse phase parses each record batch stream according to its associated data type. + /// The result of this is a map of data file paths to delete vectors for the positional + /// delete tasks (and in future for the delete vector tasks). For equality delete + /// file tasks, this results in an unbound Predicate. + /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot + /// channel to store them in the right place in the delete file managers state. + /// * The results of all of these futures are awaited on in parallel with the specified + /// level of concurrency and collected into a vec. We then combine all the delete + /// vector maps that resulted from any positional delete or delete vector files into a + /// single map and persist it in the state. + /// + /// + /// Conceptually, the data flow is like this: + /// ```none + /// FileScanTaskDeleteFile + /// | + /// Already-loading EQ Delete | Everything Else + /// +---------------------------------------------------+ + /// | | + /// [get existing future] [load recordbatch stream / puffin] + /// DeleteFileContext::InProgEqDel DeleteFileContext + /// | | + /// | | + /// | +-----------------------------+--------------------------+ + /// | Pos Del Del Vec (Not yet Implemented) EQ Del + /// | | | | + /// | [parse pos del stream] [parse del vec puffin] [parse eq del] + /// | HashMap HashMap (Predicate, Sender) + /// | | | | + /// | | | [persist to state] + /// | | | () + /// | | | | + /// | +-----------------------------+--------------------------+ + /// | | + /// | [buffer unordered] + /// | | + /// | [combine del vectors] + /// | HashMap + /// | | + /// | [persist del vectors to state] + /// | () + /// | | + /// +-------------------------+-------------------------+ + /// | + /// [join!] + /// ``` + pub(crate) fn load_deletes( + &self, + delete_file_entries: &[FileScanTaskDeleteFile], + schema: SchemaRef, + ) -> Receiver> { + let (tx, rx) = channel(); + + let stream_items = delete_file_entries + .iter() + .map(|t| { + ( + t.clone(), + self.file_io.clone(), + self.del_filter.clone(), + schema.clone(), + ) + }) + .collect::>(); + let task_stream = futures::stream::iter(stream_items); + let del_filter = self.del_filter.clone(); + let concurrency_limit_data_files = self.concurrency_limit_data_files; + crate::runtime::spawn(async move { + let result = async move { + let mut del_filter = del_filter; + + let results: Vec = task_stream + .map(move |(task, file_io, del_filter, schema)| async move { + Self::load_file_for_task(&task, file_io, del_filter, schema).await + }) + .map(move |ctx| { + Ok(async { Self::parse_file_content_for_task(ctx.await?).await }) + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_collect::>() + .await?; + + // wait for all in-progress EQ deletes from other tasks + let _ = join_all(results.iter().filter_map(|i| { + if let ParsedDeleteFileContext::InProgEqDel(fut) = i { + Some(fut.clone()) + } else { + None + } + })) + .await; + + for item in results { + if let ParsedDeleteFileContext::DelVecs(hash_map) = item { + for (data_file_path, delete_vector) in hash_map.into_iter() { + del_filter.upsert_delete_vector(data_file_path, delete_vector); + } + } + } + + Ok(del_filter) + } + .await; + + let _ = tx.send(result); + }); + + rx + } + + async fn load_file_for_task( + task: &FileScanTaskDeleteFile, + file_io: FileIO, + del_filter: DeleteFilter, + schema: SchemaRef, + ) -> Result { + match task.file_type { + DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + )), + + DataContentType::EqualityDeletes => { + let sender = { + if let Some(existing) = del_filter + .get_equality_delete_predicate_for_delete_file_path(&task.file_path) + { + return Ok(DeleteFileContext::InProgEqDel(existing.clone())); + } + + let (sender, fut) = EqDelFuture::new(); + + del_filter.insert_equality_delete(task.file_path.to_string(), fut); + + sender + }; + + Ok(DeleteFileContext::FreshEqDel { + batch_stream: Self::evolve_schema( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + schema, + ) + .await?, + sender, + equality_ids: HashSet::from_iter(task.equality_ids.clone()), + }) + } + + DataContentType::Data => Err(Error::new( + ErrorKind::Unexpected, + "tasks with files of type Data not expected here", + )), + } + } + + async fn parse_file_content_for_task( + ctx: DeleteFileContext, + ) -> Result { + match ctx { + DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)), + DeleteFileContext::PosDels(batch_stream) => { + let del_vecs = + Self::parse_positional_deletes_record_batch_stream(batch_stream).await?; + Ok(ParsedDeleteFileContext::DelVecs(del_vecs)) + } + DeleteFileContext::FreshEqDel { + sender, + batch_stream, + equality_ids, + } => { + let predicate = + Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids) + .await?; + + sender + .send(predicate) + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Could not send eq delete predicate to state", + ) + }) + .map(|_| ParsedDeleteFileContext::EqDel) + } + } + } + + /// Loads a RecordBatchStream for a given datafile. + async fn parquet_to_batch_stream( + data_file_path: &str, + file_io: FileIO, + ) -> Result { + /* + Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly + as that introduces a circular dependency. + */ + let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( + data_file_path, + file_io.clone(), + false, + ) + .await? + .build()? + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e))); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + + /// Evolves the schema of the RecordBatches from an equality delete file + async fn evolve_schema( + record_batch_stream: ArrowRecordBatchStream, + target_schema: Arc, + ) -> Result { + let eq_ids = target_schema + .as_ref() + .field_id_to_name_map() + .keys() + .cloned() + .collect::>(); + + let mut record_batch_transformer = + RecordBatchTransformer::build(target_schema.clone(), &eq_ids); + + let record_batch_stream = record_batch_stream.map(move |record_batch| { + record_batch.and_then(|record_batch| { + record_batch_transformer.process_record_batch(record_batch) + }) + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + + /// Parses a record batch stream coming from positional delete files + /// + /// Returns a map of data file path to a delete vector + async fn parse_positional_deletes_record_batch_stream( + stream: ArrowRecordBatchStream, + ) -> Result> { + // TODO + + Err(Error::new( + ErrorKind::FeatureUnsupported, + "parsing of positional deletes is not yet supported", + )) + } + + /// Parses record batch streams from individual equality delete files + /// + /// Returns an unbound Predicate for each batch stream + async fn parse_equality_deletes_record_batch_stream( + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet, + ) -> Result { + let mut result_predicate = AlwaysTrue; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let batch_schema_arrow = record_batch.schema(); + let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; + + let mut datum_columns_with_names: Vec<_> = record_batch + .columns() + .iter() + .zip(batch_schema_iceberg.as_struct().fields()) + // only use columns that are in the set of equality_ids for this delete file + .filter(|(field, value)| equality_ids.contains(&value.id)) + .map(|(column, field)| { + let col_as_datum_vec = arrow_array_to_datum_iterator(column, field); + col_as_datum_vec.map(|c| (c, field.name.to_string())) + }) + .try_collect()?; + + // consume all the iterators in lockstep, creating per-row predicates that get combined + // into a single final predicate + while datum_columns_with_names[0].0.len() > 0 { + let mut row_predicate = AlwaysTrue; + for (ref mut column, ref field_name) in &mut datum_columns_with_names { + if let Some(item) = column.next() { + if let Some(datum) = item? { + row_predicate = row_predicate + .and(Reference::new(field_name.clone()).equal_to(datum.clone())); + } + } + } + result_predicate = result_predicate.and(row_predicate.not()); + } + } + Ok(result_predicate.rewrite_not()) + } +} + +macro_rules! prim_to_datum { + ($column:ident, $arr:ty, $dat:path) => {{ + let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new( + ErrorKind::Unexpected, + format!("could not downcast ArrayRef to {}", stringify!($arr)), + ))?; + Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat))))) + }}; +} + +fn eq_col_unsupported(ty: &str) -> Error { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Equality deletes where a predicate acts upon a {} column are not yet supported", + ty + ), + ) +} + +fn arrow_array_to_datum_iterator<'a>( + column: &'a ArrayRef, + field: &NestedFieldRef, +) -> Result>> + 'a>> { + match field.field_type.as_primitive_type() { + Some(primitive_type) => match primitive_type { + PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int), + PrimitiveType::Boolean => { + prim_to_datum!(column, BooleanArray, Datum::bool) + } + PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long), + PrimitiveType::Float => { + prim_to_datum!(column, Float32Array, Datum::float) + } + PrimitiveType::Double => { + prim_to_datum!(column, Float64Array, Datum::double) + } + PrimitiveType::String => { + prim_to_datum!(column, StringArray, Datum::string) + } + PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date), + PrimitiveType::Timestamp => { + prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros) + } + PrimitiveType::Timestamptz => { + prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros) + } + PrimitiveType::TimestampNs => { + prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos) + } + PrimitiveType::TimestamptzNs => { + prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos) + } + PrimitiveType::Time => { + let arr = column + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "could not downcast ArrayRef to Time64MicrosecondArray", + ))?; + Ok(Box::new(arr.iter().map(|val| match val { + None => Ok(None), + Some(val) => Datum::time_micros(val).map(Some), + }))) + } + PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")), + PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")), + PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")), + PrimitiveType::Binary => Err(eq_col_unsupported("Binary")), + }, + None => Err(eq_col_unsupported( + "non-primitive (i.e. Struct, List, or Map)", + )), + } +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::scan::FileScanTask; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_loader_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader fails with the expected FeatureUnsupportedError. + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup_load_deletes_test_tasks(table_location); + + let result = delete_file_loader + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap(); + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + } + + #[tokio::test] + async fn test_delete_file_loader_parse_equality_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + + let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); + + let record_batch_stream = + CachingDeleteFileLoader::parquet_to_batch_stream(&eq_delete_file_path, file_io.clone()) + .await + .expect("could not get batch stream"); + + let eq_ids = HashSet::from_iter(vec![2, 3, 4]); + + let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream( + record_batch_stream, + eq_ids, + ) + .await + .expect("error parsing batch stream"); + println!("{}", parsed_eq_delete); + + let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string(); + + assert_eq!(parsed_eq_delete.to_string(), expected); + } + + fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } + + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + } + + fn setup_write_equality_delete_file_1(table_location: &str) -> String { + let col_y_vals = vec![1, 2]; + let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef; + + let col_z_vals = vec![Some(100), None]; + let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef; + + let col_a_vals = vec![Some("HELP"), None]; + let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef; + + let equality_delete_schema = { + let fields = vec![ + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + ), + arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let equality_deletes_to_write = + RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a]) + .unwrap(); + + let path = format!("{}/equality-deletes-1.parquet", &table_location); + + let file = File::create(&path).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let mut writer = ArrowWriter::try_new( + file, + equality_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&equality_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + + path + } +} diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs deleted file mode 100644 index e1ca47679..000000000 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use crate::delete_vector::DeleteVector; -use crate::expr::BoundPredicate; -use crate::io::FileIO; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::SchemaRef; -use crate::{Error, ErrorKind, Result}; - -#[allow(unused)] -pub trait DeleteFileManager { - /// Read the delete file referred to in the task - /// - /// Returns the raw contents of the delete file as a RecordBatch stream - fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result; -} - -#[allow(unused)] -#[derive(Clone, Debug)] -pub(crate) struct CachingDeleteFileManager { - file_io: FileIO, - concurrency_limit_data_files: usize, -} - -impl DeleteFileManager for CachingDeleteFileManager { - fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result { - // TODO, implementation in https://github.com/apache/iceberg-rust/pull/982 - - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } -} - -#[allow(unused_variables)] -impl CachingDeleteFileManager { - pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager { - Self { - file_io, - concurrency_limit_data_files, - } - } - - pub(crate) async fn load_deletes( - &self, - delete_file_entries: Vec, - ) -> Result<()> { - // TODO - - if !delete_file_entries.is_empty() { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } else { - Ok(()) - } - } - - pub(crate) fn build_delete_predicate( - &self, - snapshot_schema: SchemaRef, - ) -> Result> { - // TODO - - Ok(None) - } - - pub(crate) fn get_positional_delete_indexes_for_data_file( - &self, - data_file_path: &str, - ) -> Option> { - // TODO - - None - } -} diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs new file mode 100644 index 000000000..aee69c3cc --- /dev/null +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex, OnceLock, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::oneshot; + +use crate::delete_vector::DeleteVector; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; +use crate::spec::DataContentType; +use crate::{Error, ErrorKind, Result}; + +// Equality deletes may apply to more than one DataFile in a scan, and so +// the same equality delete file may be present in more than one invocation of +// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these +// to avoid having to load them twice, so we immediately store cloneable futures in the +// state that can be awaited upon to get te EQ deletes. That way we can check to see if +// a load of each Eq delete file is already in progress and avoid starting another one. +#[derive(Debug, Clone)] +pub(crate) struct EqDelFuture { + result: OnceLock, +} + +impl EqDelFuture { + pub(crate) fn new() -> (oneshot::Sender, Self) { + let (tx, rx) = oneshot::channel(); + let result = OnceLock::new(); + + crate::runtime::spawn({ + let result = result.clone(); + async move { result.set(rx.await.unwrap()) } + }); + + (tx, Self { result }) + } +} + +impl Future for EqDelFuture { + type Output = Predicate; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + match self.result.get() { + None => Poll::Pending, + Some(predicate) => Poll::Ready(predicate.clone()), + } + } +} + +#[derive(Debug, Default)] +struct DeleteFileFilterState { + delete_vectors: HashMap>>, + equality_deletes: HashMap, +} + +#[derive(Clone, Debug, Default)] +pub struct DeleteFilter { + state: Arc>, +} + +impl DeleteFilter { + /// Retrieve a delete vector for the data file associated with a given file scan task + pub fn get_delete_vector( + &self, + file_scan_task: &FileScanTask, + ) -> Option>> { + self.get_delete_vector_for_path(file_scan_task.data_file_path()) + } + + /// Retrieve a delete vector for a data file + pub fn get_delete_vector_for_path( + &self, + delete_file_path: &str, + ) -> Option>> { + self.state + .read() + .ok() + .and_then(|st| st.delete_vectors.get(delete_file_path).cloned()) + } + + /// Retrieve the equality delete predicate for a given eq delete file path + pub(crate) fn get_equality_delete_predicate_for_delete_file_path( + &self, + file_path: &str, + ) -> Option { + self.state + .read() + .unwrap() + .equality_deletes + .get(file_path) + .cloned() + } + + /// Builds eq delete predicate for the provided task. + pub async fn build_equality_delete_predicate( + &self, + file_scan_task: &FileScanTask, + ) -> Result> { + // * Filter the task's deletes into just the Equality deletes + // * Retrieve the unbound predicate for each from self.state.equality_deletes + // * Logical-AND them all together to get a single combined `Predicate` + // * Bind the predicate to the task's schema to get a `BoundPredicate` + + let mut combined_predicate = AlwaysTrue; + for delete in &file_scan_task.deletes { + if !is_equality_delete(delete) { + continue; + } + + let Some(predicate) = + self.get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Missing predicate for equality delete file '{}'", + delete.file_path + ), + )); + }; + + combined_predicate = combined_predicate.and(predicate.await); + } + + if combined_predicate == AlwaysTrue { + return Ok(None); + } + + // TODO: handle case-insensitive case + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + Ok(Some(bound_predicate)) + } + + pub(crate) fn upsert_delete_vector( + &mut self, + data_file_path: String, + delete_vector: DeleteVector, + ) { + let mut state = self.state.write().unwrap(); + + let Some(entry) = state.delete_vectors.get_mut(&data_file_path) else { + state + .delete_vectors + .insert(data_file_path, Arc::new(Mutex::new(delete_vector))); + return; + }; + + *entry.lock().unwrap() |= delete_vector; + } + + pub(crate) fn insert_equality_delete(&self, delete_file_path: String, eq_del: EqDelFuture) { + let mut state = self.state.write().unwrap(); + + state.equality_deletes.insert(delete_file_path, eq_del); + } +} + +pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { + matches!(f.file_type, DataContentType::EqualityDeletes) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::arrow::delete_file_loader::CachingDeleteFileLoader; + use crate::io::FileIO; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader fails with the expected FeatureUnsupportedError. + let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + let result = delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap(); + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } + + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 56caeaf55..c5c144853 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -23,7 +23,8 @@ pub use schema::*; mod nan_val_cnt_visitor; pub(crate) use nan_val_cnt_visitor::*; -pub(crate) mod delete_file_manager; +pub(crate) mod delete_file_loader; +pub(crate) mod delete_filter; mod reader; pub(crate) mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4ac993aee..9e1f4d33a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -41,7 +41,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::delete_file_manager::CachingDeleteFileManager; +use crate::arrow::delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -63,6 +63,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReaderBuilder { @@ -76,6 +77,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_support_enabled: false, } } @@ -104,18 +106,25 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable delete file support. + pub fn with_delete_file_support_enabled(mut self, delete_file_support_enabled: bool) -> Self { + self.delete_file_support_enabled = delete_file_support_enabled; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { batch_size: self.batch_size, file_io: self.file_io.clone(), - delete_file_manager: CachingDeleteFileManager::new( + delete_file_loader: CachingDeleteFileLoader::new( self.file_io.clone(), self.concurrency_limit_data_files, ), concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_support_enabled: self.delete_file_support_enabled, } } } @@ -125,13 +134,14 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, /// the maximum number of data files that can be fetched at the same time concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReader { @@ -143,6 +153,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let delete_file_support_enabled = self.delete_file_support_enabled; let stream = tasks .map_ok(move |task| { @@ -152,9 +163,10 @@ impl ArrowReader { task, batch_size, file_io, - self.delete_file_manager.clone(), + self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + delete_file_support_enabled, ) }) .map_err(|err| { @@ -166,26 +178,40 @@ impl ArrowReader { Ok(Box::pin(stream) as ArrowRecordBatchStream) } + #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, ) -> Result { + if !delete_file_support_enabled && !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete file support is not enabled", + )); + } + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - // concurrently retrieve delete files and create RecordBatchStreamBuilder - let (_, mut record_batch_stream_builder) = try_join!( - delete_file_manager.load_deletes(task.deletes.clone()), - Self::create_parquet_record_batch_stream_builder( - &task.data_file_path, - file_io.clone(), - should_load_page_index, - ) - )?; + let delete_filter_rx = delete_file_loader.load_deletes( + if delete_file_support_enabled { + &task.deletes + } else { + &[] + }, + task.schema.clone(), + ); + let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + ) + .await?; // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response @@ -207,7 +233,8 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; + let delete_filter = delete_filter_rx.await.unwrap()?; + let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; // In addition to the optional predicate supplied in the `FileScanTask`, // we also have an optional predicate resulting from equality delete files. @@ -275,15 +302,18 @@ impl ArrowReader { } } - let positional_delete_indexes = - delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path); + let positional_delete_indexes = delete_filter.get_delete_vector(&task); if let Some(positional_delete_indexes) = positional_delete_indexes { - let delete_row_selection = Self::build_deletes_row_selection( - record_batch_stream_builder.metadata().row_groups(), - &selected_row_group_indices, - positional_delete_indexes.as_ref(), - )?; + let delete_row_selection = { + let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); + + Self::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &selected_row_group_indices, + &positional_delete_indexes, + ) + }?; // merge the row selection from the delete files with the row selection // from the filter predicate, if there is one from the filter predicate @@ -318,7 +348,7 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - async fn create_parquet_record_batch_stream_builder( + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, should_load_page_index: bool, @@ -1716,7 +1746,7 @@ message schema { /* cases to cover: * {skip|select} {first|intermediate|last} {one row|multiple rows} in - {first|imtermediate|last} {skipped|selected} row group + {first|intermediate|last} {skipped|selected} row group * row group selection disabled */ diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4c9ffb695..6a894ebb0 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashMap; +use std::fmt::{self, Debug}; use std::future::Future; use std::ops::Deref; use std::pin::Pin; @@ -24,6 +25,7 @@ use std::task::{Context, Poll}; use futures::channel::mpsc::{channel, Sender}; use futures::StreamExt; +use waker_set::WakerSet; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; @@ -31,9 +33,10 @@ use crate::spec::{DataContentType, DataFile, Struct}; use crate::{Error, ErrorKind, Result}; /// Index of delete files -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct DeleteFileIndex { state: Arc>, + waker_set: Arc, } #[derive(Debug)] @@ -42,6 +45,15 @@ enum DeleteFileIndexState { Populated(PopulatedDeleteFileIndex), } +impl Debug for DeleteFileIndex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DeleteFileIndex") + .field("state", &self.state) + .field("waker_set", &"") + .finish() + } +} + #[derive(Debug)] struct PopulatedDeleteFileIndex { #[allow(dead_code)] @@ -59,22 +71,28 @@ impl DeleteFileIndex { pub(crate) fn new() -> (DeleteFileIndex, Sender) { // TODO: what should the channel limit be? let (tx, rx) = channel(10); + let waker_set = Arc::new(WakerSet::new()); let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating)); let delete_file_stream = rx.boxed(); spawn({ let state = state.clone(); + let waker_set = waker_set.clone(); async move { let delete_files = delete_file_stream.collect::>().await; let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); - let mut guard = state.write().unwrap(); - *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + { + let mut guard = state.write().unwrap(); + *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + } + + waker_set.notify_all(); } }); - (DeleteFileIndex { state }, tx) + (DeleteFileIndex { state, waker_set }, tx) } /// Gets all the delete files that apply to the specified data file. @@ -89,6 +107,7 @@ impl DeleteFileIndex { state: self.state.clone(), data_file, seq_num, + waker_set: self.waker_set.clone(), } } } @@ -99,7 +118,7 @@ impl PopulatedDeleteFileIndex { /// /// 1. The partition information is extracted from each delete file's manifest entry. /// 2. If the partition is empty and the delete file is not a positional delete, - /// it is added to the `global_delees` vector + /// it is added to the `global_deletes` vector /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type. fn new(files: Vec) -> PopulatedDeleteFileIndex { let mut eq_deletes_by_partition: HashMap>> = @@ -199,18 +218,22 @@ pub(crate) struct DeletesForDataFile<'a> { state: Arc>, data_file: &'a DataFile, seq_num: Option, + waker_set: Arc, } impl Future for DeletesForDataFile<'_> { type Output = Result>; - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.state.try_read() { Ok(guard) => match guard.deref() { DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( idx.get_deletes_for_data_file(self.data_file, self.seq_num) )), - _ => Poll::Pending, + _ => { + self.waker_set.insert(cx); + Poll::Pending + } }, Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), } diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index 57c15ffec..feb4eeea9 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::ops::BitOrAssign; + use roaring::bitmap::Iter; use roaring::treemap::BitmapIter; use roaring::RoaringTreemap; -#[allow(unused)] +#[derive(Debug, Default)] pub struct DeleteVector { inner: RoaringTreemap, } @@ -103,3 +105,9 @@ impl DeleteVectorIterator<'_> { inner.bitmap_iter.advance_to(lo); } } + +impl BitOrAssign for DeleteVector { + fn bitor_assign(&mut self, other: Self) { + self.inner.bitor_assign(&other.inner); + } +} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 6bfb12b23..cce32d7f2 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -45,7 +45,7 @@ pub(crate) struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_index: Option, + delete_file_index: DeleteFileIndex, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext { pub bound_predicates: Option>, pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, - pub delete_file_index: Option, + pub delete_file_index: DeleteFileIndex, } impl ManifestFileContext { @@ -105,16 +105,13 @@ impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it pub(crate) async fn into_file_scan_task(self) -> Result { - let deletes = if let Some(delete_file_index) = self.delete_file_index { - delete_file_index - .get_deletes_for_data_file( - self.manifest_entry.data_file(), - self.manifest_entry.sequence_number(), - ) - .await? - } else { - vec![] - }; + let deletes = self + .delete_file_index + .get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ) + .await?; Ok(FileScanTask { start: 0, @@ -188,7 +185,8 @@ impl PlanContext { &self, manifest_list: Arc, tx_data: Sender, - delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)>, + delete_file_idx: DeleteFileIndex, + tx_delete: Sender, ) -> Result> + 'static>> { let manifest_files = manifest_list.entries().iter(); @@ -196,16 +194,10 @@ impl PlanContext { let mut filtered_mfcs = vec![]; for manifest_file in manifest_files { - let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes { - let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else { - continue; - }; - (Some(delete_file_idx.clone()), tx.clone()) + let tx = if manifest_file.content == ManifestContentType::Deletes { + tx_delete.clone() } else { - ( - delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()), - tx_data.clone(), - ) + tx_data.clone() }; let partition_bound_predicate = if self.predicate.is_some() { @@ -233,7 +225,7 @@ impl PlanContext { manifest_file, partition_bound_predicate, tx, - delete_file_idx, + delete_file_idx.clone(), ); filtered_mfcs.push(Ok(mfc)); @@ -247,7 +239,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, - delete_file_index: Option, + delete_file_index: DeleteFileIndex, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c1cedd58e..dfdf25612 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -368,12 +368,7 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)> = - if self.delete_file_processing_enabled { - Some(DeleteFileIndex::new()) - } else { - None - }; + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); let manifest_list = plan_context.get_manifest_list().await?; @@ -383,9 +378,8 @@ impl TableScan { let manifest_file_contexts = plan_context.build_manifest_file_contexts( manifest_list, manifest_entry_data_ctx_tx, - delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| { - (delete_file_idx.clone(), manifest_entry_delete_ctx_tx) - }), + delete_file_idx.clone(), + manifest_entry_delete_ctx_tx, )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -404,34 +398,30 @@ impl TableScan { }); let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - if let Some((_, delete_file_tx)) = delete_file_idx_and_tx { - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx) - .await - }) - .await - }, - ) - .await; + // Process the delete file [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(manifest_entry_context, tx).await + }) + .await + }, + ) + .await; - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; - } - }) - .await; - } + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { @@ -461,7 +451,8 @@ impl TableScan { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + .with_row_selection_enabled(self.row_selection_enabled) + .with_delete_file_support_enabled(self.delete_file_processing_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); diff --git a/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs b/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs index 43a50c65f..f058ac360 100644 --- a/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs @@ -25,7 +25,7 @@ use iceberg_catalog_rest::RestCatalog; use crate::get_shared_containers; #[tokio::test] -async fn test_read_table_with_positional_deletes() { +async fn test_read_table_with_positional_deletes_with_delete_support_disabled() { let fixture = get_shared_containers(); let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); @@ -39,7 +39,7 @@ async fn test_read_table_with_positional_deletes() { let scan = table .scan() - .with_delete_file_processing_enabled(true) + .with_delete_file_processing_enabled(false) .build() .unwrap(); println!("{:?}", scan); @@ -53,19 +53,14 @@ async fn test_read_table_with_positional_deletes() { .unwrap(); println!("{:?}", plan); - // Scan plan phase should include delete files in file plan - // when with_delete_file_processing_enabled == true + // Scan plan phase stills include delete files in file plan + // when with_delete_file_processing_enabled == false. We instead + // fail at the read phase after this. assert_eq!(plan[0].deletes.len(), 2); - // 😱 If we don't support positional deletes, we should fail when we try to read a table that - // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py + // with delete_file_processing_enabled == false, we should fail when we + // try to read a table that has positional deletes. let result = scan.to_arrow().await.unwrap().try_collect::>().await; assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported)); - - // When we get support for it: - // let batch_stream = scan.to_arrow().await.unwrap(); - // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); - // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); - // assert_eq!(num_rows, 10); }