Skip to content

Commit fdae288

Browse files
roeaprtyler
authored andcommitted
feat: integrate new table provider with DataSink
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
1 parent d129c7c commit fdae288

File tree

2 files changed

+196
-175
lines changed

2 files changed

+196
-175
lines changed

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 7 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use datafusion::datasource::physical_plan::{
1919
FileGroup, FileSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
2020
};
2121
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
22-
use datafusion::datasource::sink::{DataSink, DataSinkExec};
22+
use datafusion::datasource::sink::DataSinkExec;
2323
use datafusion::error::DataFusionError;
2424
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
2525
use datafusion::logical_expr::dml::InsertOp;
@@ -32,7 +32,6 @@ use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdo
3232
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
3333
use datafusion::physical_plan::{
3434
DisplayAs, DisplayFormatType, ExecutionPlan, PhysicalExpr, PlanProperties,
35-
stream::RecordBatchStreamAdapter,
3635
};
3736
use datafusion::{
3837
catalog::Session,
@@ -43,9 +42,8 @@ use datafusion::{
4342
scalar::ScalarValue,
4443
};
4544
use delta_kernel::Version;
45+
use futures::TryStreamExt as _;
4646
use futures::future::BoxFuture;
47-
use futures::{StreamExt as _, TryStreamExt as _};
48-
use itertools::Itertools;
4947
use object_store::ObjectMeta;
5048
use serde::{Deserialize, Serialize};
5149
use url::Url;
@@ -58,184 +56,18 @@ use crate::delta_datafusion::{
5856
DataFusionMixins as _, LogDataHandler, get_null_of_arrow_type, register_store,
5957
to_correct_scalar_value,
6058
};
61-
use crate::kernel::schema::cast::cast_record_batch;
62-
use crate::kernel::transaction::{CommitBuilder, PROTOCOL};
63-
use crate::kernel::{Action, Add, EagerSnapshot, Snapshot};
59+
use crate::kernel::transaction::PROTOCOL;
60+
use crate::kernel::{Add, EagerSnapshot, Snapshot};
6461
use crate::logstore::LogStore;
65-
use crate::operations::write::writer::{DeltaWriter, WriterConfig};
66-
use crate::protocol::{DeltaOperation, SaveMode};
67-
use crate::table::config::TablePropertiesExt;
62+
use crate::protocol::SaveMode;
6863
use crate::table::normalize_table_url;
6964
use crate::{DeltaResult, DeltaTable, DeltaTableError, logstore::LogStoreRef};
7065

66+
mod data_sink;
7167
pub(crate) mod next;
7268

7369
const PATH_COLUMN: &str = "__delta_rs_path";
7470

75-
/// DataSink implementation for delta lake
76-
/// This uses DataSinkExec to handle the insert operation
77-
/// Implements writing streams of RecordBatches to delta.
78-
#[derive(Debug)]
79-
pub struct DeltaDataSink {
80-
/// The log store
81-
log_store: LogStoreRef,
82-
/// The snapshot
83-
snapshot: EagerSnapshot,
84-
/// The save mode
85-
save_mode: SaveMode,
86-
/// The schema
87-
schema: SchemaRef,
88-
/// Metrics for monitoring throughput
89-
metrics: ExecutionPlanMetricsSet,
90-
}
91-
92-
/// A [`DataSink`] implementation for writing to Delta Lake.
93-
///
94-
/// `DeltaDataSink` is used by [`DataSinkExec`] during query execution to
95-
/// stream [`RecordBatch`]es into a Delta table. It encapsulates everything
96-
/// needed to perform an insert/append/overwrite operation, including
97-
/// transaction log access, snapshot state, and session configuration.
98-
impl DeltaDataSink {
99-
/// Create a new `DeltaDataSink`
100-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot, save_mode: SaveMode) -> Self {
101-
Self {
102-
log_store,
103-
schema: snapshot.read_schema(),
104-
snapshot,
105-
save_mode,
106-
metrics: ExecutionPlanMetricsSet::new(),
107-
}
108-
}
109-
110-
/// Create a streaming transformed version of the input that converts dictionary columns
111-
/// This is used to convert dictionary columns to their native types
112-
fn create_converted_stream(
113-
&self,
114-
input: SendableRecordBatchStream,
115-
target_schema: SchemaRef,
116-
) -> SendableRecordBatchStream {
117-
use futures::StreamExt;
118-
119-
let schema_for_closure = Arc::clone(&target_schema);
120-
let converted_stream = input.map(move |batch_result| {
121-
batch_result.and_then(|batch| {
122-
cast_record_batch(&batch, Arc::clone(&schema_for_closure), false, true)
123-
.map_err(|e| DataFusionError::External(Box::new(e)))
124-
})
125-
});
126-
127-
Box::pin(RecordBatchStreamAdapter::new(
128-
target_schema,
129-
converted_stream,
130-
))
131-
}
132-
}
133-
134-
/// Implementation of the `DataSink` trait for `DeltaDataSink`
135-
/// This is used to write the data to the delta table
136-
/// It implements the `DataSink` trait and is used by the `DataSinkExec` node
137-
/// to write the data to the delta table
138-
#[async_trait::async_trait]
139-
impl DataSink for DeltaDataSink {
140-
fn as_any(&self) -> &dyn Any {
141-
self
142-
}
143-
144-
fn metrics(&self) -> Option<MetricsSet> {
145-
Some(self.metrics.clone_inner())
146-
}
147-
148-
fn schema(&self) -> &SchemaRef {
149-
&self.schema
150-
}
151-
152-
/// Write the data to the delta table
153-
/// This is used for insert into operation
154-
async fn write_all(
155-
&self,
156-
data: SendableRecordBatchStream,
157-
_context: &Arc<TaskContext>,
158-
) -> datafusion::common::Result<u64> {
159-
let target_schema = self.snapshot.input_schema();
160-
let table_props = self.snapshot.table_configuration().table_properties();
161-
162-
let mut stream = self.create_converted_stream(data, target_schema.clone());
163-
let partition_columns = self.snapshot.metadata().partition_columns();
164-
let object_store = self.log_store.object_store(None);
165-
let total_rows_metric = MetricBuilder::new(&self.metrics).counter("total_rows", 0);
166-
let config = WriterConfig::new(
167-
self.snapshot.read_schema(),
168-
partition_columns.clone(),
169-
None,
170-
Some(table_props.target_file_size().get() as usize),
171-
None,
172-
table_props.num_indexed_cols(),
173-
table_props
174-
.data_skipping_stats_columns
175-
.as_ref()
176-
.map(|c| c.iter().map(|c| c.to_string()).collect_vec()),
177-
);
178-
179-
let mut writer = DeltaWriter::new(object_store, config);
180-
let mut total_rows = 0u64;
181-
182-
while let Some(batch_result) = stream.next().await {
183-
let batch = batch_result?;
184-
let batch_rows = batch.num_rows() as u64;
185-
total_rows += batch_rows;
186-
total_rows_metric.add(batch_rows as usize);
187-
writer
188-
.write(&batch)
189-
.await
190-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
191-
}
192-
193-
let mut actions = writer
194-
.close()
195-
.await
196-
.map_err(|e| DataFusionError::External(Box::new(e)))?
197-
.into_iter()
198-
.map(Action::Add)
199-
.collect_vec();
200-
201-
if self.save_mode == SaveMode::Overwrite {
202-
actions.extend(
203-
self.snapshot
204-
.file_views(&self.log_store, None)
205-
.map_ok(|f| Action::Remove(f.remove_action(true)))
206-
.try_collect::<Vec<_>>()
207-
.await
208-
.map_err(|e| DataFusionError::External(Box::new(e)))?,
209-
);
210-
};
211-
212-
let operation = DeltaOperation::Write {
213-
mode: self.save_mode,
214-
partition_by: if partition_columns.is_empty() {
215-
None
216-
} else {
217-
Some(partition_columns.clone())
218-
},
219-
predicate: None,
220-
};
221-
222-
CommitBuilder::default()
223-
.with_actions(actions)
224-
.build(Some(&self.snapshot), self.log_store.clone(), operation)
225-
.await
226-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
227-
228-
Ok(total_rows)
229-
}
230-
}
231-
232-
/// Implementation of the `DisplayAs` trait for `DeltaDataSink`
233-
impl DisplayAs for DeltaDataSink {
234-
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
235-
write!(f, "DeltaDataSink")
236-
}
237-
}
238-
23971
#[derive(Debug, Clone)]
24072
/// Used to specify if additional metadata columns are exposed to the user
24173
pub struct DeltaScanConfigBuilder {
@@ -1012,7 +844,7 @@ impl TableProvider for DeltaTableProvider {
1012844
};
1013845

1014846
let data_sink =
1015-
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);
847+
data_sink::DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);
1016848

1017849
Ok(Arc::new(DataSinkExec::new(
1018850
input,

0 commit comments

Comments
 (0)