Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: arrow convenience extensions #827

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Apr 11, 2025

What changes are proposed in this pull request?

The PR introduces some convenience APIs for engines working with arrow data. Specifically we define and implement ScanExt and ExpressionEvaluatorExt which define variants of the main apis for Scan and ExpressionEvaluator respectively in terms of arrow RecordBatches.

PR #621 contains some similar work in defining a convenience function to handle Scan::execute results. In this PR a TryFrom impl is used - I was a bit unsure which approach would be better.

see: #826

also includes one cargo clippy.

This PR affects the following public APIs

new public methods when traits are in scope Scan::scan_metadata_arrow, Scan::evaluate_arrow and ExpressionEvaluator::evaluate_arrow.

How was this change tested?

additional unit tests for new APIs.

@roeap roeap force-pushed the arrow-extensions branch from 16bce4e to d6e9c4e Compare April 11, 2025 11:59
Copy link

codecov bot commented Apr 11, 2025

Codecov Report

Attention: Patch coverage is 76.74419% with 10 lines in your changes missing coverage. Please review.

Project coverage is 84.99%. Comparing base (e74d18b) to head (873f7ce).

Files with missing lines Patch % Lines
kernel/src/engine/arrow_extensions/scan.rs 78.94% 0 Missing and 8 partials ⚠️
kernel/src/engine/arrow_extensions/evaluator.rs 60.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #827      +/-   ##
==========================================
- Coverage   85.01%   84.99%   -0.02%     
==========================================
  Files          84       86       +2     
  Lines       20656    20699      +43     
  Branches    20656    20699      +43     
==========================================
+ Hits        17561    17594      +33     
- Misses       2228     2229       +1     
- Partials      867      876       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if all the new extension methods had actual use sites, to give a better sense of how useful they are? Right now only execute_arrow has a real use site.

fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
}

impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ?Sized? Are there dyn impl somewhere?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we need that in order to invoke the associated function T::evaluate?

Comment on lines +60 to +62
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
mask.map(|m| Ok(filter_record_batch(&record_batch, &m.into())?))
.unwrap_or(Ok(record_batch))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a good use for Option::map_or_else?

Suggested change
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
mask.map(|m| Ok(filter_record_batch(&record_batch, &m.into())?))
.unwrap_or(Ok(record_batch))
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
mask.map_or_else(
|| Ok(record_batch),
|m| Ok(filter_record_batch(&record_batch, &m.into())?),
}

Tho simple imperative code probably wins on readability:

Suggested change
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
mask.map(|m| Ok(filter_record_batch(&record_batch, &m.into())?))
.unwrap_or(Ok(record_batch))
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
Ok(match mask {
Some(m) => filter_record_batch(&record_batch, &m.into())?,
None => record_batch,
})

or even

Suggested change
let record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
mask.map(|m| Ok(filter_record_batch(&record_batch, &m.into())?))
.unwrap_or(Ok(record_batch))
let mut record_batch = ArrowEngineData::try_from_engine_data(data)?.into();
if let Some(m) = mask {
record_batch = filter_record_batch(&record_batch, &m.into())?;
}
Ok(record_batch)

Comment on lines +85 to +86
.map_ok(TryFrom::try_from)
.flatten())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, map_ok and flatten are a bad combination -- Err cases are silently dropped because they are treated as empty iterators. Does this work?

Suggested change
.map_ok(TryFrom::try_from)
.flatten())
.map(|result| Ok(result?.try_into()?))
.flatten_ok()

(depending on the error types, you might be able to drop the Ok(...?) wrapper)

(again below)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants