From 45eedf2da130d73d2c4d59659bc2928cd6c1b1b6 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 22 Jan 2025 16:27:20 -0800 Subject: [PATCH] Part 1, Read transforms via expressions: Just compute the expression and return it. (#607) ## What changes are proposed in this pull request? This is the initial part of moving to using expressions to express transformations when reading data. What this PR does is: - Compute a "static" transform, which is just a set of column expressions that need to be passed directly through without change, or enough metadata for lower levels to fill in a "fixup" expression - The static transform is passed into the iterator that parses each `Add` file - When parsing the `Add` file, if there are needed fix-ups (just partition columns today), the correct expression is created, and inserted into a row indexed map - This map is returned so the caller can find out for a given row what, if any, expression needs to be applied when reading the specified row Follow-up PRs: * #612: Propagate this information through when using `visit_scan_files` * #613: Actually use the data to do transformation and remove `transform_to_logical` entirely * #614: Make this work over ffi and use it * (TODO): Clean up any existing code that's now over complicated in the scan building Each of those are more invasive and end up touching significant code, so I'm staging this as much as possible to make reviews easier. ## How was this change tested? Unit tests, and inspection of resultant expressions when run on tables --- ffi/src/scan.rs | 2 +- kernel/examples/inspect-table/src/main.rs | 2 +- .../read-table-multi-threaded/src/main.rs | 2 +- kernel/src/engine_data.rs | 4 +- kernel/src/scan/log_replay.rs | 185 ++++++++++++++++-- kernel/src/scan/mod.rs | 80 +++++++- kernel/src/scan/state.rs | 2 + kernel/tests/read.rs | 2 +- 8 files changed, 250 insertions(+), 29 deletions(-) diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 86f5e7e5f..5d3b5047b 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -230,7 +230,7 @@ fn kernel_scan_data_next_impl( .data .lock() .map_err(|_| Error::generic("poisoned mutex"))?; - if let Some((data, sel_vec)) = data.next().transpose()? { + if let Some((data, sel_vec, _transforms)) = data.next().transpose()? { let bool_slice = KernelBoolSlice::from(sel_vec); (engine_visitor)(engine_context, data.into(), bool_slice); Ok(true) diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 194530004..01b1c4e88 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> { let scan = ScanBuilder::new(snapshot).build()?; let scan_data = scan.scan_data(&engine)?; for res in scan_data { - let (data, vector) = res?; + let (data, vector, _transforms) = res?; delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index e689a4ef4..57011dcc9 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -210,7 +210,7 @@ fn try_main() -> DeltaResult<()> { drop(record_batch_tx); for res in scan_data { - let (data, vector) = res?; + let (data, vector, _transforms) = res?; scan_file_tx = delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index 25a7e84bd..333ced827 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -129,7 +129,9 @@ pub trait TypedGetData<'a, T> { fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult>; fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult { let val = self.get_opt(row_index, field_name)?; - val.ok_or_else(|| Error::MissingData(format!("Data missing for field {field_name}"))) + val.ok_or_else(|| { + Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace() + }) } } diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index d7f83a4fa..33bc87075 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -1,15 +1,16 @@ use std::clone::Clone; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; +use itertools::Itertools; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use super::ScanData; +use super::{ScanData, Transform}; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef}; -use crate::scan::DeletionVectorDescriptor; +use crate::scan::{DeletionVectorDescriptor, TransformExpr}; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; @@ -44,12 +45,17 @@ struct LogReplayScanner { struct AddRemoveDedupVisitor<'seen> { seen: &'seen mut HashSet, selection_vector: Vec, + logical_schema: SchemaRef, + transform: Option>, + row_transform_exprs: Vec>, is_log_batch: bool, } impl AddRemoveDedupVisitor<'_> { /// Checks if log replay already processed this logical file (in which case the current action /// should be ignored). If not already seen, register it so we can recognize future duplicates. + /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it + /// and should process it. fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { // Note: each (add.path + add.dv_unique_id()) pair has a // unique Add + Remove pair in the log. For example: @@ -76,6 +82,37 @@ impl AddRemoveDedupVisitor<'_> { } } + /// Compute an expression that will transform from physical to logical for a given Add file action + fn get_transform_expr<'a>( + &self, + i: usize, + transform: &Transform, + getters: &[&'a dyn GetData<'a>], + ) -> DeltaResult { + let partition_values: HashMap<_, _> = getters[1].get(i, "add.partitionValues")?; + let transforms = transform + .iter() + .map(|transform_expr| match transform_expr { + TransformExpr::Partition(field_idx) => { + let field = self.logical_schema.fields.get_index(*field_idx); + let Some((_, field)) = field else { + return Err(Error::Generic( + format!("logical schema did not contain expected field at {field_idx}, can't transform data") + )); + }; + let name = field.physical_name(); + let partition_value = super::parse_partition_value( + partition_values.get(name), + field.data_type(), + )?; + Ok(partition_value.into()) + } + TransformExpr::Static(field_expr) => Ok(field_expr.clone()), + }) + .try_collect()?; + Ok(Arc::new(Expression::Struct(transforms))) + } + /// True if this row contains an Add action that should survive log replay. Skip it if the row /// is not an Add action, or the file has already been seen previously. fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult { @@ -83,11 +120,11 @@ impl AddRemoveDedupVisitor<'_> { // have a remove with a path at index 4. In either case, extract the three dv getters at // indexes that immediately follow a valid path index. let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? { - (path, &getters[1..4], true) + (path, &getters[2..5], true) } else if !self.is_log_batch { return Ok(false); - } else if let Some(path) = getters[4].get_opt(i, "remove.path")? { - (path, &getters[5..8], false) + } else if let Some(path) = getters[5].get_opt(i, "remove.path")? { + (path, &getters[6..9], false) } else { return Ok(false); }; @@ -101,9 +138,22 @@ impl AddRemoveDedupVisitor<'_> { None => None, }; - // Process both adds and removes, but only return not already-seen adds + // Check both adds and removes (skipping already-seen), but only transform and return adds let file_key = FileActionKey::new(path, dv_unique_id); - Ok(!self.check_and_record_seen(file_key) && is_add) + if self.check_and_record_seen(file_key) || !is_add { + return Ok(false); + } + let transform = self + .transform + .as_ref() + .map(|transform| self.get_transform_expr(i, transform, getters)) + .transpose()?; + if transform.is_some() { + // fill in any needed `None`s for previous rows + self.row_transform_exprs.resize_with(i, Default::default); + self.row_transform_exprs.push(transform); + } + Ok(true) } } @@ -113,8 +163,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { const STRING: DataType = DataType::STRING; const INTEGER: DataType = DataType::INTEGER; + let ss_map: DataType = MapType::new(STRING, STRING, true).into(); let types_and_names = vec![ (STRING, column_name!("add.path")), + (ss_map, column_name!("add.partitionValues")), (STRING, column_name!("add.deletionVector.storageType")), (STRING, column_name!("add.deletionVector.pathOrInlineDv")), (INTEGER, column_name!("add.deletionVector.offset")), @@ -132,12 +184,12 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So we only need to examine the adds here. - (&names[..4], &types[..4]) + (&names[..5], &types[..5]) } } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { - let expected_getters = if self.is_log_batch { 8 } else { 4 }; + let expected_getters = if self.is_log_batch { 9 } else { 5 }; require!( getters.len() == expected_getters, Error::InternalError(format!( @@ -207,6 +259,8 @@ impl LogReplayScanner { &mut self, add_transform: &dyn ExpressionEvaluator, actions: &dyn EngineData, + logical_schema: SchemaRef, + transform: Option>, is_log_batch: bool, ) -> DeltaResult { // Apply data skipping to get back a selection vector for actions that passed skipping. We @@ -220,6 +274,9 @@ impl LogReplayScanner { let mut visitor = AddRemoveDedupVisitor { seen: &mut self.seen, selection_vector, + logical_schema, + transform, + row_transform_exprs: Vec::new(), is_log_batch, }; visitor.visit_rows_of(actions)?; @@ -227,7 +284,7 @@ impl LogReplayScanner { // TODO: Teach expression eval to respect the selection vector we just computed so carefully! let selection_vector = visitor.selection_vector; let result = add_transform.evaluate(actions)?; - Ok((result, selection_vector)) + Ok((result, selection_vector, visitor.row_transform_exprs)) } } @@ -235,9 +292,11 @@ impl LogReplayScanner { /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ /// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag /// indicates whether the record batch is a log or checkpoint batch. -pub fn scan_action_iter( +pub(crate) fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator, bool)>>, + logical_schema: SchemaRef, + transform: Option>, physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> impl Iterator> { let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); @@ -249,20 +308,37 @@ pub fn scan_action_iter( action_iter .map(move |action_res| { let (batch, is_log_batch) = action_res?; - log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) + log_scanner.process_scan_batch( + add_transform.as_ref(), + batch.as_ref(), + logical_schema.clone(), + transform.clone(), + is_log_batch, + ) }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) + .filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true))) } #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{collections::HashMap, sync::Arc}; - use crate::scan::{ - state::{DvInfo, Stats}, - test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback}, + use crate::expressions::{column_name, Scalar}; + use crate::scan::state::{DvInfo, Stats}; + use crate::scan::test_utils::{ + add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, + run_with_validate_callback, + }; + use crate::scan::{get_state_info, Scan}; + use crate::Expression; + use crate::{ + engine::sync::SyncEngine, + schema::{DataType, SchemaRef, StructField, StructType}, + ExpressionRef, }; + use super::scan_action_iter; + // dv-info is more complex to validate, we validate that works in the test for visit_scan_files // in state.rs fn validate_simple( @@ -288,6 +364,8 @@ mod tests { fn test_scan_action_iter() { run_with_validate_callback( vec![add_batch_simple()], + None, // not testing schema + None, // not testing transform &[true, false], (), validate_simple, @@ -298,9 +376,80 @@ mod tests { fn test_scan_action_iter_with_remove() { run_with_validate_callback( vec![add_batch_with_remove()], + None, // not testing schema + None, // not testing transform &[false, false, true, false], (), validate_simple, ); } + + #[test] + fn test_no_transforms() { + let batch = vec![add_batch_simple()]; + let logical_schema = Arc::new(crate::schema::StructType::new(vec![])); + let iter = scan_action_iter( + &SyncEngine::new(), + batch.into_iter().map(|batch| Ok((batch as _, true))), + logical_schema, + None, + None, + ); + for res in iter { + let (_batch, _sel, transforms) = res.unwrap(); + assert!(transforms.is_empty(), "Should have no transforms"); + } + } + + #[test] + fn test_simple_transform() { + let schema: SchemaRef = Arc::new(StructType::new([ + StructField::new("value", DataType::INTEGER, true), + StructField::new("date", DataType::DATE, true), + ])); + let partition_cols = ["date".to_string()]; + let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap(); + let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields))); + let batch = vec![add_batch_with_partition_col()]; + let iter = scan_action_iter( + &SyncEngine::new(), + batch.into_iter().map(|batch| Ok((batch as _, true))), + schema, + static_transform, + None, + ); + + fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) { + assert!(transform.is_some()); + let Expression::Struct(inner) = transform.unwrap().as_ref() else { + panic!("Transform should always be a struct expr"); + }; + assert_eq!(inner.len(), 2, "expected two items in transform struct"); + + let Expression::Column(ref name) = inner[0] else { + panic!("Expected first expression to be a column"); + }; + assert_eq!(name, &column_name!("value"), "First col should be 'value'"); + + let Expression::Literal(ref scalar) = inner[1] else { + panic!("Expected second expression to be a literal"); + }; + assert_eq!( + scalar, + &Scalar::Date(expected_date_offset), + "Didn't get expected date offset" + ); + } + + for res in iter { + let (_batch, _sel, transforms) = res.unwrap(); + // in this case we have a metadata action first and protocol 3rd, so we expect 4 items, + // the first and 3rd being a `None` + assert_eq!(transforms.len(), 4, "Should have 4 transforms"); + assert!(transforms[0].is_none(), "transform at [0] should be None"); + assert!(transforms[2].is_none(), "transform at [2] should be None"); + validate_transform(transforms[1].as_ref(), 17511); + validate_transform(transforms[3].as_ref(), 17510); + } + } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 53c6d4cae..418e289fb 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -301,7 +301,20 @@ pub enum ColumnType { Partition(usize), } -pub type ScanData = (Box, Vec); +/// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up +type Transform = Vec; + +/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but +/// things like partition columns need to filled in. This enum holds an expression that's part of a +/// `Transform`. +pub(crate) enum TransformExpr { + Static(Expression), + Partition(usize), +} + +// TODO(nick): Make this a struct in a follow-on PR +// (data, deletion_vec, transforms) +pub type ScanData = (Box, Vec, Vec>); /// The result of building a scan over a table. This can be used to get the actual data from /// scanning the table. @@ -340,6 +353,21 @@ impl Scan { } } + /// Convert the parts of the transform that can be computed statically into `Expression`s. For + /// parts that cannot be computed statically, include enough metadata so lower levels of + /// processing can create and fill in an expression. + fn get_static_transform(all_fields: &[ColumnType]) -> Transform { + all_fields + .iter() + .map(|field| match field { + ColumnType::Selected(col_name) => { + TransformExpr::Static(ColumnName::new([col_name]).into()) + } + ColumnType::Partition(idx) => TransformExpr::Partition(*idx), + }) + .collect() + } + /// Get an iterator of [`EngineData`]s that should be included in scan for a query. This handles /// log-replay, reconciling Add and Remove actions, and applying data skipping (if /// possible). Each item in the returned iterator is a tuple of: @@ -352,11 +380,23 @@ impl Scan { /// the query. NB: If you are using the default engine and plan to call arrow's /// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or /// arrow will drop the extra rows. + /// - `Vec>`: Transformation expressions that need to be applied. For each + /// row at index `i` in the above data, if an expression exists at index `i` in the `Vec`, + /// the associated expression _must_ be applied to the data read from the file specified by + /// the row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If + /// the item at index `i` in this `Vec` is `None`, or if the `Vec` contains fewer than `i` + /// elements, no expression need be applied and the data read from disk is already in the + /// correct logical state. pub fn scan_data( &self, engine: &dyn Engine, ) -> DeltaResult>> { - // NOTE: This is a cheap arc clone + // Compute the static part of the transformation. This is `None` if no transformation is + // needed (currently just means no partition cols AND no column mapping but will be extended + // for other transforms as we support them) + let static_transform = (self.have_partition_cols + || self.snapshot.column_mapping_mode != ColumnMappingMode::None) + .then_some(Arc::new(Scan::get_static_transform(&self.all_fields))); let physical_predicate = match self.physical_predicate.clone() { PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), @@ -365,6 +405,8 @@ impl Scan { let it = scan_action_iter( engine, self.replay_for_scan_data(engine)?, + self.logical_schema.clone(), + static_transform, physical_predicate, ); Ok(Some(it).into_iter().flatten()) @@ -445,7 +487,7 @@ impl Scan { let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data .map(|res| { - let (data, vec) = res?; + let (data, vec, _transforms) = res?; let scan_files = vec![]; state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback) }) @@ -688,10 +730,11 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, + schema::SchemaRef, EngineData, JsonHandler, }; - use super::state::ScanCallback; + use super::{state::ScanCallback, Transform}; // TODO(nick): Merge all copies of this into one "test utils" thing fn string_array_to_engine_data(string_array: StringArray) -> Box { @@ -734,21 +777,46 @@ pub(crate) mod test_utils { ArrowEngineData::try_from_engine_data(parsed).unwrap() } + // add batch with a `date` partition col + pub(crate) fn add_batch_with_partition_col() -> Box { + let handler = SyncJsonHandler {}; + let json_strings: StringArray = vec![ + r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues": {"date": "2017-12-11"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + ] + .into(); + let output_schema = get_log_schema().clone(); + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + ArrowEngineData::try_from_engine_data(parsed).unwrap() + } + + /// Create a scan action iter and validate what's called back. If you pass `None` as + /// `logical_schema`, `transform` should also be `None` #[allow(clippy::vec_box)] pub(crate) fn run_with_validate_callback( batch: Vec>, + logical_schema: Option, + transform: Option>, expected_sel_vec: &[bool], context: T, validate_callback: ScanCallback, ) { + let logical_schema = + logical_schema.unwrap_or_else(|| Arc::new(crate::schema::StructType::new(vec![]))); let iter = scan_action_iter( &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), + logical_schema, + transform, None, ); let mut batch_count = 0; for res in iter { - let (batch, sel) = res.unwrap(); + let (batch, sel, _transforms) = res.unwrap(); assert_eq!(sel, expected_sel_vec); crate::scan::state::visit_scan_files( batch.as_ref(), @@ -959,7 +1027,7 @@ mod tests { } let mut files = vec![]; for data in scan_data { - let (data, vec) = data?; + let (data, vec, _transforms) = data?; files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?; } Ok(files) diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index b57f0c120..085af15ec 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -250,6 +250,8 @@ mod tests { let context = TestContext { id: 2 }; run_with_validate_callback( vec![add_batch_simple()], + None, // not testing schema + None, // not testing transform &[true, false], context, validate_visit, diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 46a72d309..0ef3c6f1b 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -369,7 +369,7 @@ fn read_with_scan_data( let scan_data = scan.scan_data(engine)?; let mut scan_files = vec![]; for data in scan_data { - let (data, vec) = data?; + let (data, vec, _transforms) = data?; scan_files = visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?; }