Skip to content

Commit

Permalink
Merge pull request #112 from ryan-johnson-databricks/frj-to-string
Browse files Browse the repository at this point in the history
Add and use convenience constructors for enums with string args
  • Loading branch information
ryan-johnson-databricks authored Jan 29, 2024
2 parents d09f814 + 3f957c6 commit 575c7a5
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 59 deletions.
14 changes: 7 additions & 7 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ pub(crate) fn parse_action(

let arr = batch
.column_by_name(column_name)
.ok_or(Error::MissingColumn(column_name.into()))?
.ok_or(Error::missing_column(column_name))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or(Error::UnexpectedColumnType(
"Cannot downcast to StructArray".into(),
.ok_or(Error::unexpected_column_type(
"Cannot downcast to StructArray",
))?;

match action_type {
Expand Down Expand Up @@ -161,12 +161,12 @@ fn parse_action_metadata(arr: &StructArray) -> DeltaResult<Box<dyn Iterator<Item
.keys()
.as_any()
.downcast_ref::<StringArray>()
.ok_or(Error::MissingData("expected key column in map".into()))?;
.ok_or(Error::missing_data("expected key column in map"))?;
let values = config
.values()
.as_any()
.downcast_ref::<StringArray>()
.ok_or(Error::MissingData("expected value column in map".into()))?;
.ok_or(Error::missing_data("expected value column in map"))?;
metadata.configuration = keys
.into_iter()
.zip(values)
Expand Down Expand Up @@ -498,10 +498,10 @@ fn parse_dv(

fn cast_struct_column<T: 'static>(arr: &StructArray, name: impl AsRef<str>) -> DeltaResult<&T> {
arr.column_by_name(name.as_ref())
.ok_or(Error::MissingColumn(name.as_ref().into()))?
.ok_or(Error::missing_column(name.as_ref()))?
.as_any()
.downcast_ref::<T>()
.ok_or(Error::UnexpectedColumnType(format!(
.ok_or(Error::unexpected_column_type(format!(
"Cannot downcast '{}' to expected type",
name.as_ref()
)))
Expand Down
39 changes: 16 additions & 23 deletions kernel/src/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,11 @@ impl DeletionVectorDescriptor {
"u" => {
let prefix_len = self.path_or_inline_dv.len() as i32 - 20;
if prefix_len < 0 {
return Err(Error::DeletionVector("Invalid length".to_string()));
return Err(Error::deletion_vector("Invalid length"));
}
let decoded = z85::decode(&self.path_or_inline_dv[(prefix_len as usize)..])
.map_err(|_| Error::DeletionVector("Failed to decode DV uuid".to_string()))?;
let uuid = uuid::Uuid::from_slice(&decoded)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
.map_err(|_| Error::deletion_vector("Failed to decode DV uuid"))?;
let uuid = uuid::Uuid::from_slice(&decoded).map_err(Error::deletion_vector)?;
let mut dv_suffix = format!("deletion_vector_{uuid}.bin");
if prefix_len > 0 {
dv_suffix = format!(
Expand All @@ -190,14 +189,14 @@ impl DeletionVectorDescriptor {
}
let dv_path = parent
.join(&dv_suffix)
.map_err(|_| Error::DeletionVector(format!("invalid path: {}", dv_suffix)))?;
.map_err(|_| Error::deletion_vector(format!("invalid path: {}", dv_suffix)))?;
Ok(Some(dv_path))
}
"p" => Ok(Some(Url::parse(&self.path_or_inline_dv).map_err(|_| {
Error::DeletionVector(format!("invalid path: {}", self.path_or_inline_dv))
Error::deletion_vector(format!("invalid path: {}", self.path_or_inline_dv))
})?)),
"i" => Ok(None),
other => Err(Error::DeletionVector(format!(
other => Err(Error::deletion_vector(format!(
"Unknown storage format: '{other}'."
))),
}
Expand All @@ -212,9 +211,8 @@ impl DeletionVectorDescriptor {
match self.absolute_path(&parent)? {
None => {
let bytes = z85::decode(&self.path_or_inline_dv)
.map_err(|_| Error::DeletionVector("Failed to decode DV".to_string()))?;
RoaringTreemap::deserialize_from(&bytes[12..])
.map_err(|err| Error::DeletionVector(err.to_string()))
.map_err(|_| Error::deletion_vector("Failed to decode DV"))?;
RoaringTreemap::deserialize_from(&bytes[12..]).map_err(Error::deletion_vector)
}
Some(path) => {
let offset = self.offset;
Expand All @@ -227,7 +225,7 @@ impl DeletionVectorDescriptor {
let dv_data = fs_client
.read_files(vec![(path, None)])?
.next()
.ok_or(Error::MissingData("No deletion Vector data".to_string()))??;
.ok_or(Error::missing_data("No deletion Vector data"))??;

let mut cursor = Cursor::new(dv_data);
if let Some(offset) = offset {
Expand All @@ -237,23 +235,18 @@ impl DeletionVectorDescriptor {
}

let mut buf = vec![0; 4];
cursor
.read(&mut buf)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
let magic =
i32::from_le_bytes(buf.try_into().map_err(|_| {
Error::DeletionVector("filed to read magic bytes".to_string())
})?);
cursor.read(&mut buf).map_err(Error::deletion_vector)?;
let magic = i32::from_le_bytes(
buf.try_into()
.map_err(|_| Error::deletion_vector("filed to read magic bytes"))?,
);
println!("magic --> : {}", magic);
// assert!(magic == 1681511377);

let mut buf = vec![0; size_in_bytes as usize];
cursor
.read(&mut buf)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
cursor.read(&mut buf).map_err(Error::deletion_vector)?;

RoaringTreemap::deserialize_from(Cursor::new(buf))
.map_err(|err| Error::DeletionVector(err.to_string()))
RoaringTreemap::deserialize_from(Cursor::new(buf)).map_err(Error::deletion_vector)
}
}
}
Expand Down
24 changes: 11 additions & 13 deletions kernel/src/client/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{ExpressionEvaluator, ExpressionHandler};
fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
arr.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::Generic("expected boolean array".to_string()))
.ok_or(Error::generic("expected boolean array"))
}

impl Scalar {
Expand Down Expand Up @@ -147,7 +147,7 @@ fn evaluate_expression(expression: &Expression, batch: &RecordBatch) -> DeltaRes
} else {
batch
.column_by_name(name)
.ok_or(Error::MissingColumn(name.clone()))
.ok_or(Error::missing_column(name))
.cloned()
}
}
Expand Down Expand Up @@ -176,9 +176,7 @@ fn evaluate_expression(expression: &Expression, batch: &RecordBatch) -> DeltaRes
NotEqual => |l, r| neq(l, r).map(wrap_comparison_result),
};

eval(&left_arr, &right_arr).map_err(|err| Error::GenericError {
source: Box::new(err),
})
eval(&left_arr, &right_arr).map_err(Error::generic_err)
}
VariadicOperation { op, exprs } => {
type Operation = fn(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>;
Expand Down Expand Up @@ -239,7 +237,7 @@ mod tests {
let values = Int32Array::from(vec![1, 2, 3]);
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values.clone())]).unwrap();
let column = Expression::Column("a".to_string());
let column = Expression::column("a");

let results = evaluate_expression(&column, &batch).unwrap();
assert_eq!(results.as_ref(), &values);
Expand All @@ -260,7 +258,7 @@ mod tests {
vec![Arc::new(struct_array.clone())],
)
.unwrap();
let column = Expression::Column("b.a".to_string());
let column = Expression::column("b.a");
let results = evaluate_expression(&column, &batch).unwrap();
assert_eq!(results.as_ref(), &values);
}
Expand All @@ -270,7 +268,7 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let values = Int32Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
let column = Expression::Column("a".to_string());
let column = Expression::column("a");

let expression = Box::new(column.clone().add(Expression::Literal(Scalar::Integer(1))));
let results = evaluate_expression(&expression, &batch).unwrap();
Expand Down Expand Up @@ -306,8 +304,8 @@ mod tests {
vec![Arc::new(values.clone()), Arc::new(values)],
)
.unwrap();
let column_a = Expression::Column("a".to_string());
let column_b = Expression::Column("b".to_string());
let column_a = Expression::column("a");
let column_b = Expression::column("b");

let expression = Box::new(column_a.clone().add(column_b.clone()));
let results = evaluate_expression(&expression, &batch).unwrap();
Expand All @@ -330,7 +328,7 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let values = Int32Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
let column = Expression::Column("a".to_string());
let column = Expression::column("a");
let lit = Expression::Literal(Scalar::Integer(2));

let expression = Box::new(column.clone().lt(lit.clone()));
Expand Down Expand Up @@ -378,8 +376,8 @@ mod tests {
],
)
.unwrap();
let column_a = Expression::Column("a".to_string());
let column_b = Expression::Column("b".to_string());
let column_a = Expression::column("a");
let column_b = Expression::column("b");

let expression = Box::new(column_a.clone().and(column_b.clone()));
let results = evaluate_expression(&expression, &batch).unwrap();
Expand Down
5 changes: 1 addition & 4 deletions kernel/src/client/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ impl FileOpener for ParquetOpener {
.with_batch_size(batch_size)
.build()?;

let adapted = stream.map_err(|e| Error::GenericError {
source: Box::new(e),
});

let adapted = stream.map_err(Error::generic_err);
Ok(adapted.boxed())
}))
}
Expand Down
29 changes: 28 additions & 1 deletion kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,38 @@ pub enum Error {
MissingMetadata,
}

// Convenience constructors for Error types that take a String argument
impl Error {
pub fn generic_err(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
Self::GenericError {
source: source.into(),
}
}
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
pub fn file_not_found(path: impl ToString) -> Self {
Self::FileNotFound(path.to_string())
}
pub fn missing_column(name: impl ToString) -> Self {
Self::MissingColumn(name.to_string())
}
pub fn unexpected_column_type(name: impl ToString) -> Self {
Self::UnexpectedColumnType(name.to_string())
}
pub fn missing_data(name: impl ToString) -> Self {
Self::MissingData(name.to_string())
}
pub fn deletion_vector(msg: impl ToString) -> Self {
Self::DeletionVector(msg.to_string())
}
}

#[cfg(feature = "object_store")]
impl From<object_store::Error> for Error {
fn from(value: object_store::Error) -> Self {
match value {
object_store::Error::NotFound { path, .. } => Self::FileNotFound(path),
object_store::Error::NotFound { path, .. } => Self::file_not_found(path),
err => Self::ObjectStore(err),
}
}
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ impl Expression {
}

/// Create an new expression for a column reference
pub fn column(name: impl Into<String>) -> Self {
Self::Column(name.into())
pub fn column(name: impl ToString) -> Self {
Self::Column(name.to_string())
}

/// Create a new expression for a literal value
Expand Down
18 changes: 9 additions & 9 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,19 @@ impl DataSkippingFilter {
pub(crate) fn apply(&self, actions: &RecordBatch) -> DeltaResult<RecordBatch> {
let adds = actions
.column_by_name("add")
.ok_or(Error::MissingColumn("Column 'add' not found.".into()))?
.ok_or(Error::missing_column("Column 'add' not found."))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StructArray'.".into(),
.ok_or(Error::unexpected_column_type(
"Expected type 'StructArray'.",
))?;
let stats = adds
.column_by_name("stats")
.ok_or(Error::MissingColumn("Column 'stats' not found.".into()))?
.ok_or(Error::missing_column("Column 'stats' not found."))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'StringArray'.".into(),
.ok_or(Error::unexpected_column_type(
"Expected type 'StringArray'.",
))?;

let stats_schema = Arc::new(self.stats_schema.as_ref().try_into()?);
Expand Down Expand Up @@ -197,8 +197,8 @@ impl DataSkippingFilter {
let skipping_vector = skipping_vector
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::UnexpectedColumnType(
"Expected type 'BooleanArray'.".into(),
.ok_or(Error::unexpected_column_type(
"Expected type 'BooleanArray'.",
))?;

// let skipping_vector = self.predicate.invoke(&parsed_stats)?;
Expand All @@ -224,7 +224,7 @@ impl DataSkippingFilter {
.into_iter()
.next()
.transpose()?
.ok_or(Error::MissingData("Expected data".into()))?),
.ok_or(Error::missing_data("Expected data"))?),
None => Ok(RecordBatch::try_new(
stats_schema.clone(),
stats_schema
Expand Down

0 comments on commit 575c7a5

Please sign in to comment.