Skip to content

Commit 651f2db

Browse files
wip: gotta figure out how to handle structs in structs with create_one
1 parent 52f3eda commit 651f2db

File tree

8 files changed

+185
-17
lines changed

8 files changed

+185
-17
lines changed

ffi/src/expressions/kernel.rs

+1
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ fn visit_expression_internal(
333333
visit_expression_struct_literal(visitor, struct_data, sibling_list_id)
334334
}
335335
Scalar::Array(array) => visit_expression_array(visitor, array, sibling_list_id),
336+
Scalar::Map(_map_data) => todo!(),
336337
}
337338
}
338339
fn visit_expression_impl(

kernel/src/actions/mod.rs

+27-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
9191
&LOG_COMMIT_INFO_SCHEMA
9292
}
9393

94-
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
94+
#[derive(Debug, Clone, PartialEq, Eq, Schema, IntoEngineData)]
9595
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
9696
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
9797
pub(crate) struct Format {
@@ -110,7 +110,7 @@ impl Default for Format {
110110
}
111111
}
112112

113-
#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
113+
#[derive(Debug, Default, Clone, PartialEq, Eq, Schema, IntoEngineData)]
114114
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
115115
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
116116
pub(crate) struct Metadata {
@@ -133,6 +133,19 @@ pub(crate) struct Metadata {
133133
}
134134

135135
impl Metadata {
136+
pub(crate) fn new(name: String, schema: StructType, partition_columns: Vec<String>) -> Self {
137+
Self {
138+
id: uuid::Uuid::new_v4().to_string(),
139+
name: Some(name),
140+
description: None,
141+
format: Format::default(),
142+
schema_string: serde_json::to_string(&schema).unwrap_or_default(),
143+
partition_columns,
144+
created_time: Some(chrono::Utc::now().timestamp_millis()),
145+
configuration: HashMap::new(),
146+
}
147+
}
148+
136149
pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
137150
let mut visitor = MetadataVisitor::default();
138151
visitor.visit_rows_of(data)?;
@@ -163,7 +176,7 @@ impl Metadata {
163176
}
164177
}
165178

166-
#[derive(Default, Debug, Clone, PartialEq, Eq, Schema, Serialize, Deserialize)]
179+
#[derive(Debug, Clone, PartialEq, Eq, Schema, Serialize, Deserialize, IntoEngineData)]
167180
#[serde(rename_all = "camelCase")]
168181
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
169182
// TODO move to another module so that we disallow constructing this struct without using the
@@ -185,6 +198,17 @@ pub(crate) struct Protocol {
185198
writer_features: Option<Vec<WriterFeature>>,
186199
}
187200

201+
impl Default for Protocol {
202+
fn default() -> Self {
203+
Protocol {
204+
min_reader_version: 3,
205+
min_writer_version: 7,
206+
reader_features: Some(SUPPORTED_READER_FEATURES.to_vec()),
207+
writer_features: Some(SUPPORTED_WRITER_FEATURES.to_vec()),
208+
}
209+
}
210+
}
211+
188212
fn parse_features<T>(features: Option<impl IntoIterator<Item = impl ToString>>) -> Option<Vec<T>>
189213
where
190214
T: FromStr,

kernel/src/engine/arrow_expression/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ impl Scalar {
8484
None,
8585
))
8686
}
87+
Map(map_data) => todo!(),
8788
Null(DataType::BYTE) => Arc::new(Int8Array::new_null(num_rows)),
8889
Null(DataType::SHORT) => Arc::new(Int16Array::new_null(num_rows)),
8990
Null(DataType::INTEGER) => Arc::new(Int32Array::new_null(num_rows)),

kernel/src/expressions/literal_expression_transform.rs

+46-10
Original file line numberDiff line numberDiff line change
@@ -160,21 +160,57 @@ impl<'a, T: Iterator<Item = &'a Scalar>> SchemaTransform<'a> for LiteralExpressi
160160
Some(Cow::Borrowed(field))
161161
}
162162

163-
// arrays unsupported for now
164-
fn transform_array(&mut self, _array_type: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
163+
// arrays treated as leaves
164+
fn transform_array(&mut self, array_type: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
165+
// first always check error to terminate early if possible
165166
self.error.as_ref().ok()?;
166-
self.set_error(Error::Unsupported(
167-
"ArrayType not yet supported in literal expression transform".to_string(),
168-
));
167+
let Some(scalar) = self.scalars.next() else {
168+
self.set_error(Error::InsufficientScalars);
169+
return None;
170+
};
171+
172+
let DataType::Array(scalar_type) = scalar.data_type() else {
173+
self.set_error(Error::Schema(
174+
"Non-array scalar type {datatype} provided for array leaf".to_string(),
175+
));
176+
return None;
177+
};
178+
if scalar_type.as_ref() != array_type {
179+
self.set_error(Error::Schema(format!(
180+
"Mismatched scalar type while creating Expression: expected {:?}, got {:?}",
181+
array_type, scalar_type
182+
)));
183+
return None;
184+
}
185+
186+
self.stack.push(Expression::Literal(scalar.clone()));
169187
None
170188
}
171189

172-
// maps unsupported for now
173-
fn transform_map(&mut self, _map_type: &'a MapType) -> Option<Cow<'a, MapType>> {
190+
// maps treated as leaves
191+
fn transform_map(&mut self, map_type: &'a MapType) -> Option<Cow<'a, MapType>> {
192+
// first always check error to terminate early if possible
174193
self.error.as_ref().ok()?;
175-
self.set_error(Error::Unsupported(
176-
"MapType not yet supported in literal expression transform".to_string(),
177-
));
194+
let Some(scalar) = self.scalars.next() else {
195+
self.set_error(Error::InsufficientScalars);
196+
return None;
197+
};
198+
199+
let DataType::Map(scalar_type) = scalar.data_type() else {
200+
self.set_error(Error::Schema(
201+
"Non-map scalar type {datatype} provided for map leaf".to_string(),
202+
));
203+
return None;
204+
};
205+
if scalar_type.as_ref() != map_type {
206+
self.set_error(Error::Schema(format!(
207+
"Mismatched scalar type while creating Expression: expected {:?}, got {:?}",
208+
map_type, scalar_type
209+
)));
210+
return None;
211+
}
212+
213+
self.stack.push(Expression::Literal(scalar.clone()));
178214
None
179215
}
180216
}

kernel/src/expressions/scalars.rs

+61-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::cmp::Ordering;
2+
use std::collections::HashMap;
23
use std::fmt::{Display, Formatter};
34

45
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
56

67
use crate::actions::schemas::ToDataType;
7-
use crate::schema::{ArrayType, DataType, PrimitiveType, StructField};
8+
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField};
89
use crate::utils::require;
910
use crate::{DeltaResult, Error};
1011

@@ -22,11 +23,11 @@ pub struct ArrayData {
2223
}
2324

2425
impl ArrayData {
25-
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
2626
pub fn new(tpe: ArrayType, elements: impl IntoIterator<Item = impl Into<Scalar>>) -> Self {
2727
let elements = elements.into_iter().map(Into::into).collect();
2828
Self { tpe, elements }
2929
}
30+
3031
pub fn array_type(&self) -> &ArrayType {
3132
&self.tpe
3233
}
@@ -89,6 +90,12 @@ impl StructData {
8990
}
9091
}
9192

93+
#[derive(Debug, Clone)]
94+
pub struct MapData {
95+
data_type: MapType,
96+
elements: Vec<(Scalar, Scalar)>,
97+
}
98+
9299
/// A single value, which can be null. Used for representing literal values
93100
/// in [Expressions][crate::expressions::Expression].
94101
#[derive(Debug, Clone)]
@@ -125,6 +132,8 @@ pub enum Scalar {
125132
Struct(StructData),
126133
/// Array Value
127134
Array(ArrayData),
135+
/// Map Value
136+
Map(MapData),
128137
}
129138

130139
impl Scalar {
@@ -146,6 +155,7 @@ impl Scalar {
146155
Self::Null(data_type) => data_type.clone(),
147156
Self::Struct(data) => DataType::struct_type(data.fields.clone()),
148157
Self::Array(data) => data.tpe.clone().into(),
158+
Self::Map(map_data) => map_data.data_type.clone().into(),
149159
}
150160
}
151161

@@ -222,6 +232,7 @@ impl Display for Scalar {
222232
}
223233
write!(f, ")")
224234
}
235+
Self::Map(_map_data) => todo!(),
225236
}
226237
}
227238
}
@@ -269,6 +280,7 @@ impl PartialOrd for Scalar {
269280
(Null(_), _) => None, // NOTE: NULL values are incomparable by definition
270281
(Struct(_), _) => None, // TODO: Support Struct?
271282
(Array(_), _) => None, // TODO: Support Array?
283+
(Map(_), _) => None, // TODO: Support Map?
272284
}
273285
}
274286
}
@@ -348,6 +360,53 @@ impl<T: Into<Scalar> + ToDataType> From<Option<T>> for Scalar {
348360
}
349361
}
350362

363+
impl<T: Into<Scalar> + ToDataType> From<Vec<T>> for Scalar {
364+
fn from(v: Vec<T>) -> Self {
365+
Scalar::Array(ArrayData::new(ArrayType::new(T::to_data_type(), true), v))
366+
}
367+
}
368+
369+
impl<K, V> From<HashMap<K, V>> for Scalar
370+
where
371+
K: Into<Scalar> + ToDataType,
372+
V: Into<Scalar> + ToDataType,
373+
{
374+
fn from(map: HashMap<K, V>) -> Self {
375+
Scalar::Map(MapData {
376+
data_type: MapType::new(K::to_data_type(), V::to_data_type(), true),
377+
elements: map.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
378+
})
379+
}
380+
}
381+
382+
use crate::actions::Format;
383+
impl From<Format> for Scalar {
384+
fn from(format: Format) -> Self {
385+
let map_type = MapType::new(DataType::STRING, DataType::STRING, true);
386+
let fields = vec![
387+
StructField::new("provider", DataType::STRING, false),
388+
StructField::new("options", DataType::Map(Box::new(map_type)), true),
389+
];
390+
let values = vec![
391+
Scalar::String(format.provider.to_string()),
392+
format.options.into(),
393+
];
394+
Scalar::Struct(StructData::try_new(fields, values).unwrap())
395+
}
396+
}
397+
398+
use crate::table_features::{ReaderFeature, WriterFeature};
399+
impl From<ReaderFeature> for Scalar {
400+
fn from(feature: ReaderFeature) -> Self {
401+
Scalar::String(feature.to_string())
402+
}
403+
}
404+
impl From<WriterFeature> for Scalar {
405+
fn from(feature: WriterFeature) -> Self {
406+
Scalar::String(feature.to_string())
407+
}
408+
}
409+
351410
// TODO: add more From impls
352411

353412
impl PrimitiveType {

kernel/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ trait EvaluationHandlerExtension: EvaluationHandler {
376376
let null_row = self.null_row(null_row_schema.clone())?;
377377

378378
// Convert schema and leaf values to an expression
379+
println!("{:?}, {:?}", schema, values);
379380
let mut schema_transform = LiteralExpressionTransform::new(values);
380381
schema_transform.transform_struct(schema.as_ref());
381382
let row_expr = schema_transform.try_into_expr()?;

kernel/src/table.rs

+32-2
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ use std::path::PathBuf;
77

88
use url::Url;
99

10+
use crate::actions::{Metadata, Protocol};
11+
use crate::path::ParsedLogPath;
1012
use crate::snapshot::Snapshot;
1113
use crate::table_changes::TableChanges;
1214
use crate::transaction::Transaction;
13-
use crate::{DeltaResult, Engine, Error, Version};
15+
use crate::IntoEngineData;
16+
use crate::{schema::Schema, DeltaResult, Engine, Error, Version};
1417

1518
/// In-memory representation of a Delta table, which acts as an immutable root entity for reading
1619
/// the different versions (see [`Snapshot`]) of the table located in storage.
@@ -28,11 +31,38 @@ impl std::fmt::Debug for Table {
2831
}
2932

3033
impl Table {
31-
/// Create a new Delta table with the given parameters
34+
/// Read a Delta table from the given location
3235
pub fn new(location: Url) -> Self {
3336
Self { location }
3437
}
3538

39+
/// Create a new Delta table with a given schema (with default protocol/metadata) at the
40+
/// location provided
41+
pub fn create(
42+
name: &str,
43+
schema: Schema,
44+
location: Url,
45+
engine: &dyn Engine,
46+
) -> DeltaResult<Self> {
47+
let protocol = Protocol::default();
48+
let metadata = Metadata::new(name.to_string(), schema, vec![]);
49+
50+
let actions = vec![
51+
protocol.into_engine_data(engine),
52+
metadata.into_engine_data(engine),
53+
];
54+
55+
let json_handler = engine.json_handler();
56+
let commit_path = ParsedLogPath::new_commit(&location, 0)?;
57+
json_handler.write_json_file(
58+
&commit_path.location,
59+
Box::new(actions.into_iter()),
60+
false,
61+
)?;
62+
63+
Ok(Self::new(location))
64+
}
65+
3666
/// Try to create a new table from a string uri. This will do it's best to handle things like
3767
/// `/local/paths`, and even `../relative/paths`.
3868
pub fn try_from_uri(uri: impl AsRef<str>) -> DeltaResult<Self> {

kernel/tests/create.rs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use delta_kernel::engine::sync::SyncEngine;
2+
use delta_kernel::schema::{DataType, Schema, StructField};
3+
use delta_kernel::Table;
4+
5+
use url::Url;
6+
7+
#[test]
8+
fn test_create_table() {
9+
let engine = SyncEngine::new();
10+
let schema = Schema::new([StructField::nullable("id", DataType::INTEGER)]);
11+
let location = Url::parse("file:////Users/zach.schuermann/Desktop/test_table/").unwrap();
12+
let table = Table::create("test_table", schema, location, &engine).unwrap();
13+
14+
let snapshot = table.snapshot(&engine, None).unwrap();
15+
assert_eq!(snapshot.version(), 0);
16+
}

0 commit comments

Comments
 (0)