Skip to content

Commit

Permalink
cleanup unneeded complexity, fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Lanham committed Sep 12, 2024
1 parent 1a70524 commit f497907
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 115 deletions.
58 changes: 38 additions & 20 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ fn evaluate_expression(
// return a RecordBatch where the names of fields in `sa` have been transformed to match those in
// schema specified by `output_type`
fn apply_schema(sa: &StructArray, output_type: &DataType) -> DeltaResult<RecordBatch> {
let applied = apply_to_col(sa.data_type(), sa, output_type)?.ok_or(
Error::generic("apply_to_col at top-level should return something")
)?;
let applied_sa = applied.as_struct_opt().ok_or(
Error::generic("apply_to_col at top-level should return a struct array")
)?;
let applied = apply_to_col(sa.data_type(), sa, output_type)?.ok_or(Error::generic(
"apply_to_col at top-level should return something",
))?;
let applied_sa = applied.as_struct_opt().ok_or(Error::generic(
"apply_to_col at top-level should return a struct array",
))?;
Ok(applied_sa.into())
}

Expand All @@ -398,17 +398,27 @@ fn apply_to_col(
"Arrow claimed to be a struct but isn't a StructArray".to_string(),
))?;
let (fields, sa_cols, sa_nulls) = sa.clone().into_parts();
let result_iter = fields.into_iter().zip(sa_cols).zip(kernel_fields.fields()).map(|((sa_field, sa_col), kernel_field)| -> DeltaResult<(ArrowField, Arc<dyn Array>)> {
let transformed_col = apply_to_col(sa_field.data_type(), &sa_col, kernel_field.data_type())?.unwrap_or(sa_col);
let transformed_field = sa_field
.as_ref()
.clone()
.with_name(kernel_field.name.clone())
.with_data_type(transformed_col.data_type().clone());
Ok((transformed_field, transformed_col))
});
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<Arc<dyn Array>>) = result_iter.process_results(|iter| iter.unzip())?;
let transformed_array = StructArray::try_new(transformed_fields.into(), transformed_cols, sa_nulls)?;
let result_iter = fields
.into_iter()
.zip(sa_cols)
.zip(kernel_fields.fields())
.map(
|((sa_field, sa_col), kernel_field)| -> DeltaResult<(ArrowField, Arc<dyn Array>)> {
let transformed_col =
apply_to_col(sa_field.data_type(), &sa_col, kernel_field.data_type())?
.unwrap_or(sa_col);
let transformed_field = sa_field
.as_ref()
.clone()
.with_name(kernel_field.name.clone())
.with_data_type(transformed_col.data_type().clone());
Ok((transformed_field, transformed_col))
},
);
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<Arc<dyn Array>>) =
result_iter.process_results(|iter| iter.unzip())?;
let transformed_array =
StructArray::try_new(transformed_fields.into(), transformed_cols, sa_nulls)?;
Ok(Some(Arc::new(transformed_array)))
}
(DataType::Array(inner_type), ArrowDataType::List(_arrow_list_type)) => {
Expand All @@ -417,9 +427,17 @@ fn apply_to_col(
"Arrow claimed to be a list but isn't a ListArray".to_string(),
))?;
let (field, offset_buffer, values, nulls) = la.clone().into_parts();
let transformed_values = apply_to_col(field.data_type(), &values, &inner_type.element_type)?.unwrap_or(values);
let transformed_field = Arc::new(field.as_ref().clone().with_data_type(transformed_values.data_type().clone()));
let transformed_array = ListArray::try_new(transformed_field, offset_buffer, transformed_values, nulls)?;
let transformed_values =
apply_to_col(field.data_type(), &values, &inner_type.element_type)?
.unwrap_or(values);
let transformed_field = Arc::new(
field
.as_ref()
.clone()
.with_data_type(transformed_values.data_type().clone()),
);
let transformed_array =
ListArray::try_new(transformed_field, offset_buffer, transformed_values, nulls)?;
Ok(Some(Arc::new(transformed_array)))
}
_ => {
Expand Down
142 changes: 47 additions & 95 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,108 +356,60 @@ fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaRes
}
}

// fn make_datatype_physical(data_type: DataType, column_mapping_mode: ColumnMappingMode) -> DeltaResult<()> {
// match data_type {
// DataType::Struct(struct_type) => {
// // build up the mapped child fields
// let children = struct_type
// .fields()
// .map(|field| make_physical(field, column_mapping_mode))
// .try_collect()?;
// let mapped_type = StructType::new(children);
// let physical_name = logical_field.physical_name(column_mapping_mode)?;
// Ok(StructField {
// name: physical_name.to_string(),
// data_type: DataType::Struct(Box::new(mapped_type)),
// nullable: logical_field.nullable,
// metadata: logical_field.metadata.clone(),
// })
// }
// DataType::Array(array_type) => {
// println!("Here {array_type:?}");
// panic!("Nope");
// }
// _ => data_type.clone(),
// }
// }

enum FieldOrDataTypeRef<'a> {
Field(&'a StructField),
DataType(&'a DataType),
}

impl<'a> FieldOrDataTypeRef<'a> {
fn make_owned(self) -> FieldOrDataType {
match self {
FieldOrDataTypeRef::Field(f) => FieldOrDataType::Field(f.clone()),
FieldOrDataTypeRef::DataType(dt) => FieldOrDataType::DataType(dt.clone()),
}
}
}

enum FieldOrDataType {
Field(StructField),
DataType(DataType),
}

impl FieldOrDataType {
fn as_field(self) -> DeltaResult<StructField> {
match self {
FieldOrDataType::Field(f) => Ok(f),
FieldOrDataType::DataType(dt) => panic!("noper"),
}
}

fn as_data_type(self) -> DeltaResult<DataType> {
match self {
FieldOrDataType::DataType(dt) => Ok(dt),
FieldOrDataType::Field(f) => panic!("noper"),
/// Transform a logical field into the physical form. Currently just renames things for 'name'
/// column mapping.
fn make_field_physical(
logical_field: &StructField,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<StructField> {
match column_mapping_mode {
ColumnMappingMode::None => Ok(logical_field.clone()),
ColumnMappingMode::Name => {
let physical_name = logical_field.physical_name(column_mapping_mode)?;
let field_data_type = logical_field.data_type();
let mapped_data_type = make_data_type_physical(field_data_type, column_mapping_mode)?;
Ok(StructField {
name: physical_name.to_string(),
data_type: mapped_data_type,
nullable: logical_field.nullable,
metadata: logical_field.metadata.clone(),
})
}
ColumnMappingMode::Id => panic!("No id"),
}
}

/// Transform a logical field or data_type into the physical form. Currently just renames things for 'name'
/// column mapping.
fn make_physical<'a>(
logical: FieldOrDataTypeRef<'a>,
/// Transform a DataType into the physical form. Currently just renames anything in a nested type
/// for 'name' column mapping.
fn make_data_type_physical(
logical_dt: &DataType,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<FieldOrDataType> {
) -> DeltaResult<DataType> {
match column_mapping_mode {
ColumnMappingMode::None => Ok(logical.make_owned()),
ColumnMappingMode::None => Ok(logical_dt.clone()),
ColumnMappingMode::Name => {
match logical {
FieldOrDataTypeRef::Field(logical_field) => {
let physical_name = logical_field.physical_name(column_mapping_mode)?;
let field_data_type = logical_field.data_type();
let mapped_data_type = make_physical(FieldOrDataTypeRef::DataType(field_data_type), column_mapping_mode).and_then(|dt| dt.as_data_type())?;
Ok(FieldOrDataType::Field(StructField {
name: physical_name.to_string(),
data_type: mapped_data_type,
nullable: logical_field.nullable,
metadata: logical_field.metadata.clone(),
}))
// we don't need to rename at this level, just need to keep the recursion going
// because there might be structs below us
match logical_dt {
DataType::Array(array_type) => {
let new_type =
make_data_type_physical(&array_type.element_type, column_mapping_mode)?;
Ok(DataType::Array(Box::new(ArrayType::new(
new_type,
array_type.contains_null,
))))
}
DataType::Struct(struct_type) => {
// build up the mapped child fields
let children = struct_type
.fields()
.map(|field| make_field_physical(field, column_mapping_mode))
.try_collect()?;
Ok(DataType::Struct(Box::new(StructType::new(children))))
}
FieldOrDataTypeRef::DataType(data_type) => {
// we don't need to rename at this level, just need to keep the recursion going
// because there might be structs below us
match data_type {
DataType::Array(array_type) => {
let new_type = make_physical(FieldOrDataTypeRef::DataType(&array_type.element_type), column_mapping_mode).and_then(|dt| dt.as_data_type())?;
Ok(FieldOrDataType::DataType(DataType::Array(Box::new(ArrayType::new(new_type, array_type.contains_null)))))
}
DataType::Struct(struct_type) => {
// build up the mapped child fields
let children = struct_type
.fields()
.map(|field| make_physical(FieldOrDataTypeRef::Field(field), column_mapping_mode).and_then(|f| f.as_field()))
.try_collect()?;
Ok(FieldOrDataType::DataType(DataType::Struct(Box::new(StructType::new(children)))))
}
_ => {
// types with no children don't change
Ok(logical.make_owned())
}
}
_ => {
// types with no children don't change
Ok(logical_dt.clone())
}
}
}
Expand Down Expand Up @@ -494,7 +446,7 @@ fn get_state_info(
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field = make_physical(FieldOrDataTypeRef::Field(logical_field), column_mapping_mode).and_then(|f| f.as_field())?;
let physical_field = make_field_physical(logical_field, column_mapping_mode)?;
println!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let name = physical_field.name.clone();
read_fields.push(physical_field);
Expand Down

0 comments on commit f497907

Please sign in to comment.