-
Notifications
You must be signed in to change notification settings - Fork 566
Open
Labels
binding/pythonIssues for the Python packageIssues for the Python packagebinding/rustIssues for the Rust crateIssues for the Rust crate
Description
What happened?
Running zorder optimize multiple times rewrites the whole table.
Expected behavior
Running optimize multiple times with no data change is no-op.
Operating System
macOS
Binding
Rust
Bindings Version
0.30.0
Steps to reproduce
use deltalake::arrow::array::*;
use deltalake::datafusion::datasource::TableProvider;
use deltalake::operations::optimize::OptimizeType;
use deltalake::protocol::SaveMode;
use std::env::{args, var};
use std::sync::Arc;
use url::Url;
#[tokio::main]
async fn main() {
let table = var("TABLE").expect("TABLE environment variable not set");
let url = Url::parse(&table).expect("Failed to parse TABLE URL");
match args().nth(1).as_deref() {
Some("create") => create(&url).await,
Some("optimize") => optimize(&url).await,
Some("add") => add(&url).await,
_ => {
eprintln!("Usage: <program> <create|optimize|add>");
}
}
}
async fn create(url: &Url) {
deltalake::DeltaTable::try_from_url(url.clone())
.await
.expect("Failed to open table")
.create()
.with_column("id", deltalake::DataType::LONG, false, None)
.with_column("sub_id", deltalake::DataType::LONG, true, None)
.with_column("data1", deltalake::DataType::DOUBLE, true, None)
.with_column("data2", deltalake::DataType::DOUBLE, true, None)
.with_column("data3", deltalake::DataType::DOUBLE, true, None)
.with_save_mode(SaveMode::Overwrite)
.into_future()
.await
.expect("Failed to create table");
let table = deltalake::open_table(url.clone())
.await
.expect("Failed to load table");
let id_array = (1..=100_000_000).step_by(3).collect::<Vec<i64>>();
let sub_id_array = vec![None, Some(1), Some(2)]
.into_iter()
.cycle()
.take(id_array.len())
.collect::<Vec<Option<i64>>>();
let data1_array = (0..id_array.len())
.map(|x| (x as f64).sin())
.collect::<Vec<f64>>();
let data2_array = (0..id_array.len())
.map(|x| (x as f64).cos())
.collect::<Vec<f64>>();
let data3_array = (0..id_array.len())
.map(|x| (x as f64).tan())
.collect::<Vec<f64>>();
let record_batch = RecordBatch::try_new(
table.schema(),
vec![
Arc::new(Int64Array::from(id_array)),
Arc::new(Int64Array::from(sub_id_array)),
Arc::new(Float64Array::from(data1_array)),
Arc::new(Float64Array::from(data2_array)),
Arc::new(Float64Array::from(data3_array)),
],
)
.expect("Failed to create record batch");
table
.write(vec![record_batch])
.with_save_mode(SaveMode::Overwrite)
.into_future()
.await
.expect("Failed to write data");
}
async fn optimize(url: &Url) {
let table = deltalake::open_table(url.clone())
.await
.expect("Failed to load table");
table
.optimize()
.with_type(OptimizeType::ZOrder(vec![
"id".to_string(),
"sub_id".to_string(),
]))
.into_future()
.await
.expect("Failed to optimize table");
}
async fn add(table: &Url) {
let id_array = (2..1_000_000).step_by(3).collect::<Vec<i64>>();
let sub_id_array = vec![None, Some(1), Some(2)]
.into_iter()
.cycle()
.take(id_array.len())
.collect::<Vec<Option<i64>>>();
let data1_array = (0..id_array.len())
.map(|x| (x as f64).sin())
.collect::<Vec<f64>>();
let data2_array = (0..id_array.len())
.map(|x| (x as f64).cos())
.collect::<Vec<f64>>();
let data3_array = (0..id_array.len())
.map(|x| (x as f64).tan())
.collect::<Vec<f64>>();
let table = deltalake::open_table(table.clone())
.await
.expect("Failed to load table");
let record_batch = RecordBatch::try_new(
table.schema(),
vec![
Arc::new(Int64Array::from(id_array)),
Arc::new(Int64Array::from(sub_id_array)),
Arc::new(Float64Array::from(data1_array)),
Arc::new(Float64Array::from(data2_array)),
Arc::new(Float64Array::from(data3_array)),
],
)
.expect("Failed to create record batch");
table
.write(vec![record_batch])
.with_save_mode(SaveMode::Append)
.into_future()
.await
.expect("Failed to write data");
}- Run
cargo run --release create cargo run --release optimizecargo run --release optimize
Check delta history:
You will see that 8 files added, 8 files removed after the second operation.
When optimize run twice in databricks
optimize table ZORDER BY id, sub_id
the second operation is not recorded/no-op. This is done by tracking optimizations in tags, which allows skipping already optimized files.
{"add":{"path":"part-00003-89d1167b-097c-42bf-9147-be85bde21c68.c000.snappy.parquet","partitionValues":{},"size":164620112,"modificationTime":1768226169000,"dataChange":false,"stats":"{\"numRecords\":5870752,\"minValues\":{\"id\":1,\"data1\":-0.9999999999999166,\"data2\":-0.9999999999999934,\"data3\":-3402633.7954244036},\"maxValues\":{\"id\":51836761,\"data1\":0.9999999999999839,\"data2\":1.0,\"data3\":5577600.674301789},\"nullCount\":{\"id\":0,\"sub_id\":5870752,\"data1\":0,\"data2\":0,\"data3\":0}}","tags":{"MAX_INSERTION_TIME":"1768226154793000","INSERTION_TIME":"1768226023846000","ZCUBE_ZORDER_CURVE":"kdtree","ZCUBE_ZORDER_BY":"[\"id\",\"sub_id\"]","OPTIMIZE_TARGET_SIZE":"268435456","MIN_INSERTION_TIME":"1768226023846000","ZCUBE_ID":"ecdf4490-405f-4962-b7bf-b574349ea61a"}}}
Relevant logs
Metadata
Metadata
Assignees
Labels
binding/pythonIssues for the Python packageIssues for the Python packagebinding/rustIssues for the Rust crateIssues for the Rust crate
Type
Projects
Status
No status