Skip to content

Commit

Permalink
feat: cdf tableprovider
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 15, 2025
1 parent 1befab9 commit 81166d7
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 240 deletions.
3 changes: 1 addition & 2 deletions crates/core/src/delta_datafusion/cdf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::sync::LazyLock;

use arrow_schema::{DataType, Field, TimeUnit};

pub(crate) use self::scan::*;
pub(crate) use self::scan_utils::*;
use crate::kernel::{Add, AddCDCFile, Remove};
use crate::DeltaResult;

mod scan;
pub mod scan;
mod scan_utils;

/// Change type column name
Expand Down
130 changes: 93 additions & 37 deletions crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,119 @@
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::catalog::TableProvider;
use datafusion::execution::SessionState;
use datafusion_common::{exec_datafusion_err, Column, DFSchema, Result as DataFusionResult};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::limit::GlobalLimitExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::ExecutionPlan;

/// Physical execution of a scan
#[derive(Debug, Clone)]
pub struct DeltaCdfScan {
plan: Arc<dyn ExecutionPlan>,
}
use crate::DeltaTableError;
use crate::{
delta_datafusion::DataFusionMixins, operations::load_cdf::CdfLoadBuilder, DeltaResult,
};

impl DeltaCdfScan {
/// Creates a new scan
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}
use super::ADD_PARTITION_SCHEMA;

fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
}

impl DisplayAs for DeltaCdfScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
#[derive(Debug)]
pub struct DeltaCdfTableProvider {
cdf_builder: CdfLoadBuilder,
schema: SchemaRef,
}

impl ExecutionPlan for DeltaCdfScan {
fn name(&self) -> &str {
Self::static_name()
impl DeltaCdfTableProvider {
/// Build a DeltaCDFTableProvider
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
let mut fields = cdf_builder.snapshot.input_schema()?.fields().to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
}
Ok(DeltaCdfTableProvider {
cdf_builder,
schema: Schema::new(fields).into(),
})
}
}

#[async_trait]
impl TableProvider for DeltaCdfTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.plan.schema().clone()
self.schema.clone()
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.plan.properties()
fn table_type(&self) -> TableType {
TableType::Base
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
async fn scan(
&self,
session: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = session_state_from_session(session)?;
let mut plan = self.cdf_builder.build(session_state).await?;

let df_schema: DFSchema = plan.schema().try_into()?;

if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr = session.create_physical_expr(filter_expr, &df_schema)?;
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
}

if let Some(projection) = projection {
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection != &current_projection {
let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
.iter()
.map(|i| {
let (table_ref, field) = df_schema.qualified_field(*i);
session
.create_physical_expr(
Expr::Column(Column::from((table_ref, field))),
&df_schema,
)
.map(|expr| (expr, field.name().clone()))
.map_err(DeltaTableError::from)
})
.collect();
let fields = fields?;
plan = Arc::new(ProjectionExec::try_new(fields, plan)?);
}
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
self.plan.clone().with_new_children(_children)
if let Some(limit) = limit {
plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
};
Ok(plan)
}

fn execute(
fn supports_filters_pushdown(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
self.plan.execute(partition, context)
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Exact) // maybe exact
.collect())
}
}
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl ContextProvider for DeltaContextProvider<'_> {
}

/// Parse a string predicate into an `Expr`
pub(crate) fn parse_predicate_expression(
pub fn parse_predicate_expression(
schema: &DFSchema,
expr: impl AsRef<str>,
df_state: &SessionState,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub mod logical;
pub mod physical;
pub mod planner;

pub use cdf::scan::DeltaCdfTableProvider;

mod schema_adapter;

impl From<DeltaTableError> for DataFusionError {
Expand Down
27 changes: 12 additions & 15 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {
use crate::delta_datafusion::cdf::DeltaCdfScan;
use crate::kernel::DataType as DeltaDataType;
use crate::operations::collect_sendable_stream;
use crate::operations::DeltaOps;
Expand Down Expand Up @@ -975,9 +974,8 @@ mod tests {
let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
.with_session_ctx(ctx.clone())
.with_starting_version(0)
.build()
.build(&ctx.state())
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -1060,9 +1058,8 @@ mod tests {
let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
.with_session_ctx(ctx.clone())
.with_starting_version(0)
.build()
.build(&ctx.state())
.await
.expect("Failed to load CDF");

Expand All @@ -1075,23 +1072,23 @@ mod tests {
.expect("Failed to collect batches");

// The batches will contain a current _commit_timestamp which shouldn't be check_append_only
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect();
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(4)).collect();

assert_batches_sorted_eq! {[
"+-------+--------------+-----------------+------+",
"| value | _change_type | _commit_version | year |",
"+-------+--------------+-----------------+------+",
"| 1 | insert | 1 | 2020 |",
"| 2 | delete | 2 | 2020 |",
"| 2 | insert | 1 | 2020 |",
"| 3 | insert | 1 | 2024 |",
"+-------+--------------+-----------------+------+",
"+-------+------+--------------+-----------------+",
"| value | year | _change_type | _commit_version |",
"+-------+------+--------------+-----------------+",
"| 1 | 2020 | insert | 1 |",
"| 2 | 2020 | delete | 2 |",
"| 2 | 2020 | insert | 1 |",
"| 3 | 2024 | insert | 1 |",
"+-------+------+--------------+-----------------+",
], &batches }
}

async fn collect_batches(
num_partitions: usize,
stream: DeltaCdfScan,
stream: Arc<dyn ExecutionPlan>,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let mut batches = vec![];
Expand Down
Loading

0 comments on commit 81166d7

Please sign in to comment.