Skip to content

Commit

Permalink
clean up list_from a bit + fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Feb 14, 2024
1 parent 2fb11f5 commit 9834460
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 56 deletions.
4 changes: 3 additions & 1 deletion kernel/src/actions/action_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,9 @@ mod tests {
.unwrap();
let add_schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]);
let mut multi_add_visitor = MultiVisitor::new(visit_add);
data_extractor.extract(batch.as_ref(), Arc::new(add_schema), &mut multi_add_visitor).unwrap();
data_extractor
.extract(batch.as_ref(), Arc::new(add_schema), &mut multi_add_visitor)
.unwrap();
let add1 = Add {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(),
partition_values: HashMap::from([
Expand Down
7 changes: 6 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ pub trait ParquetHandler: Send + Sync {
pub trait DataExtractor {
/// Extract data as requested by [`schema`] and then call back into `visitor.visit` with a Vec
/// of that data. Return Ok(()) unless an error was encountered during extraction.
fn extract(&self, blob: &dyn EngineData, schema: SchemaRef, visitor: &mut dyn DataVisitor) -> DeltaResult<()>;
fn extract(
&self,
blob: &dyn EngineData,
schema: SchemaRef,
visitor: &mut dyn DataVisitor,
) -> DeltaResult<()>;
// Return the number of items (rows?) in blob
fn length(&self, blob: &dyn EngineData) -> usize;
}
Expand Down
20 changes: 15 additions & 5 deletions kernel/src/simple_client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ impl SimpleData {
.to_file_path()
.map_err(|_| Error::Generic("can only read local files".to_string()))?,
)?;
let mut json = arrow_json::ReaderBuilder::new(Arc::new(arrow_schema))
.build(BufReader::new(file))?;
let mut json =
arrow_json::ReaderBuilder::new(Arc::new(arrow_schema)).build(BufReader::new(file))?;
let data = json.next().ok_or(Error::Generic(
"No data found reading json file".to_string(),
))?;
Expand Down Expand Up @@ -174,7 +174,10 @@ impl SimpleData {
// just need a helper that can recurse the kernel schema type and push Nones
res_arry.push(None);
} else {
return Err(Error::Generic(format!("Didn't find non-nullable column: {}", field.name)));
return Err(Error::Generic(format!(
"Didn't find non-nullable column: {}",
field.name
)));
}
}
Some(col) => {
Expand All @@ -195,7 +198,11 @@ impl SimpleData {
res_arry,
)?;
}
_ => return Err(Error::Generic("Schema mismatch during extraction".to_string())),
_ => {
return Err(Error::Generic(
"Schema mismatch during extraction".to_string(),
))
}
}
}
if col.is_null(row) {
Expand Down Expand Up @@ -233,7 +240,10 @@ impl SimpleData {
}
typ => {
error!("CAN'T EXTRACT: {}", typ);
return Err(Error::Generic(format!("Unimplemented extraction for type: {}", typ)));
return Err(Error::Generic(format!(
"Unimplemented extraction for type: {}",
typ
)));
}
}
}
Expand Down
89 changes: 42 additions & 47 deletions kernel/src/simple_client/fs_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::{Path, PathBuf};
use std::{fs, time::SystemTime};
use std::path::Path;
use std::time::SystemTime;

use bytes::Bytes;
use itertools::Itertools;
Expand Down Expand Up @@ -33,43 +33,37 @@ impl FileSystemClient for SimpleFilesystemClient {
(parent, Some(file_name))
};

let all_ents: std::io::Result<Vec<fs::DirEntry>> = std::fs::read_dir(path_to_read)?
.sorted_by_key(|ent_res| {
ent_res
.as_ref()
.map(|ent| ent.path())
.unwrap_or_else(|_| PathBuf::new())
})
let all_ents: Vec<_> = std::fs::read_dir(path_to_read)?
.filter(|ent_res| {
match (ent_res, min_file_name) {
(Ok(ent), Some(min_file_name)) => ent.file_name() >= *min_file_name,
_ => true, // Keep errors and unfiltered entries
_ => true, // Keep unfiltered entries
}
})
.collect();
let all_ents = all_ents?; // any errors in reading dir entries will force a return here
// now all_ents is a sorted list of DirEntries, we can just map over it

let it = all_ents.into_iter().map(|ent| {
ent.metadata().map_err(Error::IOError).and_then(|metadata| {
let last_modified: u64 = metadata
.modified()
.map(
|modified| match modified.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => 0,
},
)
.unwrap_or(0);
Url::from_file_path(ent.path())
.map(|location| FileMeta {
location,
last_modified: last_modified as i64,
size: metadata.len() as usize,
})
.map_err(|_| Error::Generic(format!("Invalid path: {:?}", ent.path())))
})
});
.try_collect()?;
let it = all_ents
.into_iter()
.sorted_by_key(|ent| ent.path())
.map(|ent| {
ent.metadata().map_err(Error::IOError).and_then(|metadata| {
let last_modified: u64 = metadata
.modified()
.map(
|modified| match modified.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => 0,
},
)
.unwrap_or(0);
Url::from_file_path(ent.path())
.map(|location| FileMeta {
location,
last_modified: last_modified as i64,
size: metadata.len() as usize,
})
.map_err(|_| Error::Generic(format!("Invalid path: {:?}", ent.path())))
})
});
Ok(Box::new(it))
} else {
Err(Error::Generic("Can only read local filesystem".to_string()))
Expand Down Expand Up @@ -109,36 +103,37 @@ mod tests {
fn test_list_from() -> Result<(), Box<dyn std::error::Error>> {
let client = SimpleFilesystemClient;
let tmp_dir = tempfile::tempdir().unwrap();
let mut expected = vec![];
for i in 0..3 {
let path = tmp_dir.path().join(format!("000{i}.json"));
let path = tmp_dir.path().join(format!("{i:020}.json"));
expected.push(path.clone());
let mut f = File::create(path)?;
writeln!(f, "null")?;
}
let url_path = tmp_dir.path().join("0001.json");
let url_path = tmp_dir.path().join(format!("{:020}.json", 1));
let url = Url::from_file_path(url_path).unwrap();
let list = client.list_from(&url)?;
let mut file_count = 0;
for _ in list {
for (i, file) in list.enumerate() {
// i+1 in index because we started at 0001 in the listing
assert_eq!(
file.unwrap().location.path(),
expected[i + 1].to_str().unwrap()
);
file_count += 1;
}
assert_eq!(file_count, 2);

let url_path = tmp_dir.path().join("");
let url = Url::from_file_path(url_path).unwrap();
let list = client.list_from(&url)?;
file_count = 0;
for _ in list {
file_count += 1;
}
file_count = list.count();
assert_eq!(file_count, 3);

let url_path = tmp_dir.path().join("0001");
let url_path = tmp_dir.path().join(format!("{:020}", 1));
let url = Url::from_file_path(url_path).unwrap();
let list = client.list_from(&url)?;
file_count = 0;
for _ in list {
file_count += 1;
}
file_count = list.count();
assert_eq!(file_count, 2);
Ok(())
}
Expand All @@ -147,7 +142,7 @@ mod tests {
fn test_read_files() -> Result<(), Box<dyn std::error::Error>> {
let client = SimpleFilesystemClient;
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().join("0001.json");
let path = tmp_dir.path().join(format!("{:020}.json", 1));
let mut f = File::create(path.clone())?;
writeln!(f, "null")?;
let url = Url::from_file_path(path).unwrap();
Expand Down
10 changes: 8 additions & 2 deletions kernel/src/simple_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use crate::engine_data::{DataVisitor, EngineData, TypeTag};
use crate::schema::SchemaRef;
use crate::{
DataExtractor, EngineClient, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, DeltaResult,
DataExtractor, DeltaResult, EngineClient, ExpressionHandler, FileSystemClient, JsonHandler,
ParquetHandler,
};

use std::sync::Arc;
Expand All @@ -27,7 +28,12 @@ impl SimpleDataExtractor {
}

impl DataExtractor for SimpleDataExtractor {
fn extract(&self, blob: &dyn EngineData, schema: SchemaRef, visitor: &mut dyn DataVisitor) -> DeltaResult<()> {
fn extract(
&self,
blob: &dyn EngineData,
schema: SchemaRef,
visitor: &mut dyn DataVisitor,
) -> DeltaResult<()> {
assert!(self.expected_tag.eq(blob.type_tag()));
let data: &data::SimpleData = blob
.as_any()
Expand Down

0 comments on commit 9834460

Please sign in to comment.