Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 156 additions & 8 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
use std::process::ExitCode;
use std::sync::Arc;

use arrow::array::{BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray};
use arrow::array::{
ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
StructArray,
};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
Expand All @@ -24,6 +28,8 @@
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

/// An example program that writes to a Delta table and creates it if necessary.
///
/// Demonstrates writing data with various types including nested structs.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
Expand All @@ -32,18 +38,22 @@
location_args: LocationArgs,

/// Comma-separated schema specification of the form `field_name:data_type`
/// Supported types: string, integer, long, double, boolean, timestamp
#[arg(
long,
short = 's',
default_value = "id:integer,name:string,score:double"
)]
schema: String,

/// Use a predefined schema with nested structs to demonstrate nested data support.
/// Schema: id:long, address:{city:string, coords:{lat:double}}
#[arg(long, default_value = "false")]
nested: bool,

/// Number of rows to generate for the example data
#[arg(long, short, default_value = "10")]
num_rows: usize,
// TODO: Support providing input data from a JSON file instead of generating random data
// TODO: Support specifying whether the transaction should overwrite, append, or error if the table already exists
}

#[tokio::main]
Expand All @@ -58,7 +68,6 @@
}
}

// TODO: Update the example once official write APIs are introduced (issue#1123)
async fn try_main() -> DeltaResult<()> {
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Write", "write", "");

Expand All @@ -79,11 +88,19 @@
use delta_kernel::engine::default::storage::store_from_url;
let engine = DefaultEngineBuilder::new(store_from_url(&url)?).build();

// Create or get the table
let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?;
// Create or get the table (use nested schema if --nested flag is set)
let snapshot = if cli.nested {
create_or_get_nested_snapshot(&url, &engine).await?
} else {
create_or_get_base_snapshot(&url, &engine, &cli.schema).await?
};

// Create sample data based on the schema
let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?;
let sample_data = if cli.nested {
create_nested_sample_data(cli.num_rows)?
} else {
create_sample_data(&snapshot.schema(), cli.num_rows)?
};

// Write sample data to the table
let committer = Box::new(FileSystemCommitter::new());
Expand Down Expand Up @@ -136,6 +153,24 @@
Ok(())
}

/// Creates a new Delta table with nested schema or gets an existing one.
async fn create_or_get_nested_snapshot(url: &Url, engine: &dyn Engine) -> DeltaResult<SnapshotRef> {
// Check if table already exists
match Snapshot::builder_for(url.clone()).build(engine) {
Ok(snapshot) => {
println!("✓ Found existing table at version {}", snapshot.version());
Ok(snapshot)
}
Err(_) => {
// Create new table with nested schema
println!("Creating new Delta table with nested schema...");
let schema = create_nested_schema()?;

Check failure on line 167 in kernel/examples/write-table/src/main.rs

View workflow job for this annotation

GitHub Actions / run-examples

this function takes 3 arguments but 2 arguments were supplied

Check failure on line 167 in kernel/examples/write-table/src/main.rs

View workflow job for this annotation

GitHub Actions / coverage

this function takes 3 arguments but 2 arguments were supplied

Check failure on line 167 in kernel/examples/write-table/src/main.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

this function takes 3 arguments but 2 arguments were supplied
create_table(url, &schema).await?;
Snapshot::builder_for(url.clone()).build(engine)
}
}
}

/// Creates a new Delta table or gets an existing one.
async fn create_or_get_base_snapshot(
url: &Url,
Expand All @@ -158,6 +193,38 @@
}
}

/// Create a schema with nested structs for demonstration.
///
/// Schema structure:
/// - id: long
/// - address: struct
/// - city: string
/// - coords: struct
/// - lat: double
fn create_nested_schema() -> DeltaResult<SchemaRef> {
let schema = StructType::try_new(vec![
StructField::new("id", DataType::LONG, false),
StructField::new(
"address",
DataType::try_struct_type(vec![
StructField::new("city", DataType::STRING, true),
StructField::new(
"coords",
DataType::try_struct_type(vec![StructField::new(
"lat",
DataType::DOUBLE,
true,
)])?,
true,
),
])?,
true,
),
])?;

Ok(Arc::new(schema))
}

/// Parse a schema string into a SchemaRef.
fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
let fields = schema_str
Expand Down Expand Up @@ -257,6 +324,87 @@
Ok(())
}

/// Create sample data with nested structs.
///
/// Includes null values to demonstrate nullCount statistics:
/// - city: null for every 4th row
/// - lat: null for every 3rd row
fn create_nested_sample_data(num_rows: usize) -> DeltaResult<ArrowEngineData> {
let cities = vec![
("New York", 40.7128),
("Los Angeles", 34.0522),
("Chicago", 41.8781),
("Houston", 29.7604),
("Phoenix", 33.4484),
];

let id_data: Vec<i64> = (0..num_rows).map(|i| i as i64 + 1).collect();

// city: null for every 4th row
let city_data: Vec<Option<&str>> = (0..num_rows)
.map(|i| {
if i % 4 == 3 {
None
} else {
Some(cities[i % cities.len()].0)
}
})
.collect();

// lat: null for every 3rd row
let lat_data: Vec<Option<f64>> = (0..num_rows)
.map(|i| {
if i % 3 == 2 {
None
} else {
Some(cities[i % cities.len()].1)
}
})
.collect();

// Build coords struct (innermost)
let coords_fields = vec![Field::new("lat", ArrowDataType::Float64, true)];
let coords_array = StructArray::try_new(
coords_fields.clone().into(),
vec![Arc::new(Float64Array::from(lat_data)) as ArrayRef],
None,
)?;

// Build address struct
let address_fields = vec![
Field::new("city", ArrowDataType::Utf8, true),
Field::new("coords", ArrowDataType::Struct(coords_fields.into()), true),
];
let address_array = StructArray::try_new(
address_fields.clone().into(),
vec![
Arc::new(StringArray::from(city_data)) as ArrayRef,
Arc::new(coords_array) as ArrayRef,
],
None,
)?;

// Build the full schema
let arrow_schema = arrow::datatypes::Schema::new(vec![
Field::new("id", ArrowDataType::Int64, false),
Field::new(
"address",
ArrowDataType::Struct(address_fields.into()),
true,
),
]);

let record_batch = RecordBatch::try_new(
Arc::new(arrow_schema),
vec![
Arc::new(Int64Array::from(id_data)) as ArrayRef,
Arc::new(address_array) as ArrayRef,
],
)?;

Ok(ArrowEngineData::new(record_batch))
}

/// Create sample data based on the schema.
fn create_sample_data(schema: &SchemaRef, num_rows: usize) -> DeltaResult<ArrowEngineData> {
let fields = schema.fields();
Expand All @@ -274,7 +422,7 @@
}
DataType::LONG => {
let data: Vec<i64> = (0..num_rows).map(|i| i as i64).collect();
Arc::new(arrow::array::Int64Array::from(data))
Arc::new(Int64Array::from(data))
}
DataType::DOUBLE => {
let data: Vec<f64> = (0..num_rows).map(|i| i as f64 * 1.5).collect();
Expand Down
8 changes: 7 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
mod stats;
pub mod storage;

/// Converts a Stream-producing future to a synchronous iterator.
Expand Down Expand Up @@ -216,7 +217,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
Some(write_context.stats_columns()),
)
.await
}
}
Expand Down
Loading
Loading