Skip to content

Commit 15afd12

Browse files
sanujbasuSanuj Basu
authored andcommitted
test: Add integration tests for CreateTable API
Add comprehensive tests validating the basic create_table() functionality introduced. Test coverage includes: - test_create_simple_table: Verifies basic table creation with a multi-column schema, snapshot version (0), and field preservation - test_create_table_already_exists: Validates error handling when attempting to create a table at an existing path - test_create_table_empty_schema: Ensures empty schema validation fails at build time with appropriate error message - test_create_table_log_actions: Verifies delta log structure with correct action ordering (CommitInfo, Protocol, Metadata) for ICT compliance, and validates action contents including engineInfo, operation type, protocol versions, and kernelVersion
1 parent 2cad3d4 commit 15afd12

File tree

1 file changed

+265
-0
lines changed

1 file changed

+265
-0
lines changed

kernel/tests/create_table.rs

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
//! Integration tests for the CreateTable API
2+
3+
use std::sync::Arc;
4+
5+
use delta_kernel::committer::FileSystemCommitter;
6+
use delta_kernel::schema::{DataType, StructField, StructType};
7+
use delta_kernel::snapshot::Snapshot;
8+
use delta_kernel::table_features::{
9+
TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
10+
};
11+
use delta_kernel::transaction::create_table::create_table;
12+
use delta_kernel::DeltaResult;
13+
use serde_json::Value;
14+
use tempfile::tempdir;
15+
use test_utils::{assert_result_error_with_message, create_default_engine};
16+
17+
#[tokio::test]
18+
async fn test_create_simple_table() -> DeltaResult<()> {
19+
// Setup
20+
let temp_dir = tempdir().expect("Failed to create temp dir");
21+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
22+
23+
let engine =
24+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
25+
26+
// Create schema for an events table
27+
let schema = Arc::new(StructType::try_new(vec![
28+
StructField::new("event_id", DataType::LONG, false),
29+
StructField::new("user_id", DataType::LONG, false),
30+
StructField::new("event_type", DataType::STRING, false),
31+
StructField::new("timestamp", DataType::TIMESTAMP, false),
32+
StructField::new("properties", DataType::STRING, true),
33+
])?);
34+
35+
// Create table using new API
36+
let _result = create_table(&table_path, schema.clone(), "DeltaKernel-RS/0.17.0")
37+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
38+
.commit(engine.as_ref())?;
39+
40+
// Verify table was created
41+
let table_url = delta_kernel::try_parse_uri(&table_path)?;
42+
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
43+
44+
assert_eq!(snapshot.version(), 0);
45+
assert_eq!(snapshot.schema().fields().len(), 5);
46+
47+
// Verify protocol versions are (3, 7) by reading the log file
48+
let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path);
49+
let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file");
50+
let actions: Vec<Value> = log_contents
51+
.lines()
52+
.map(|line| serde_json::from_str(line).expect("Failed to parse JSON"))
53+
.collect();
54+
55+
let protocol_action = actions
56+
.iter()
57+
.find(|a| a.get("protocol").is_some())
58+
.expect("Protocol action not found");
59+
let protocol = protocol_action.get("protocol").unwrap();
60+
assert_eq!(
61+
protocol["minReaderVersion"],
62+
TABLE_FEATURES_MIN_READER_VERSION
63+
);
64+
assert_eq!(
65+
protocol["minWriterVersion"],
66+
TABLE_FEATURES_MIN_WRITER_VERSION
67+
);
68+
// Verify no reader/writer features are set (empty arrays for table features mode)
69+
assert_eq!(protocol["readerFeatures"], Value::Array(vec![]));
70+
assert_eq!(protocol["writerFeatures"], Value::Array(vec![]));
71+
72+
// Verify no table properties are set via public API
73+
use delta_kernel::table_properties::TableProperties;
74+
assert_eq!(snapshot.table_properties(), &TableProperties::default());
75+
76+
// Verify schema field names
77+
let field_names: Vec<_> = snapshot
78+
.schema()
79+
.fields()
80+
.map(|f| f.name().to_string())
81+
.collect();
82+
assert!(field_names.contains(&"event_id".to_string()));
83+
assert!(field_names.contains(&"user_id".to_string()));
84+
assert!(field_names.contains(&"event_type".to_string()));
85+
assert!(field_names.contains(&"timestamp".to_string()));
86+
assert!(field_names.contains(&"properties".to_string()));
87+
88+
Ok(())
89+
}
90+
91+
#[tokio::test]
92+
async fn test_create_table_already_exists() -> DeltaResult<()> {
93+
// Setup
94+
let temp_dir = tempdir().expect("Failed to create temp dir");
95+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
96+
97+
let engine =
98+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
99+
100+
// Create schema for a user profiles table
101+
let schema = Arc::new(StructType::try_new(vec![
102+
StructField::new("user_id", DataType::LONG, false),
103+
StructField::new("username", DataType::STRING, false),
104+
StructField::new("email", DataType::STRING, false),
105+
StructField::new("created_at", DataType::TIMESTAMP, false),
106+
StructField::new("is_active", DataType::BOOLEAN, false),
107+
])?);
108+
109+
// Create table first time
110+
let _result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
111+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
112+
.commit(engine.as_ref())?;
113+
114+
// Try to create again - should fail at build time (table already exists)
115+
let result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
116+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
117+
118+
assert_result_error_with_message(result, "already exists");
119+
120+
Ok(())
121+
}
122+
123+
#[tokio::test]
124+
async fn test_create_table_empty_schema_not_supported() -> DeltaResult<()> {
125+
// Setup
126+
let temp_dir = tempdir().expect("Failed to create temp dir");
127+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
128+
129+
let engine =
130+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
131+
132+
// Create empty schema
133+
let schema = Arc::new(StructType::try_new(vec![])?);
134+
135+
// Try to create table with empty schema - should fail at build time
136+
let result = create_table(&table_path, schema, "InvalidApp/0.1.0")
137+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
138+
139+
assert_result_error_with_message(result, "cannot be empty");
140+
141+
Ok(())
142+
}
143+
144+
#[tokio::test]
145+
async fn test_create_table_log_actions() -> DeltaResult<()> {
146+
// Setup
147+
let temp_dir = tempdir().expect("Failed to create temp dir");
148+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
149+
150+
let engine =
151+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
152+
153+
// Create schema
154+
let schema = Arc::new(StructType::try_new(vec![
155+
StructField::new("user_id", DataType::LONG, false),
156+
StructField::new("action", DataType::STRING, false),
157+
])?);
158+
159+
let engine_info = "AuditService/2.1.0";
160+
161+
// Create table
162+
let _ = create_table(&table_path, schema, engine_info)
163+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
164+
.commit(engine.as_ref())?;
165+
166+
// Read the actual Delta log file
167+
let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path);
168+
let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file");
169+
170+
// Parse each line (each line is a separate JSON action)
171+
let actions: Vec<Value> = log_contents
172+
.lines()
173+
.map(|line| serde_json::from_str(line).expect("Failed to parse JSON"))
174+
.collect();
175+
176+
// Verify we have exactly 3 actions: CommitInfo, Protocol, Metadata
177+
// CommitInfo is first to comply with ICT (In-Commit Timestamps) protocol requirements
178+
assert_eq!(
179+
actions.len(),
180+
3,
181+
"Expected 3 actions (commitInfo, protocol, metaData), found {}",
182+
actions.len()
183+
);
184+
185+
// Verify CommitInfo action (first for ICT compliance)
186+
let commit_info_action = &actions[0];
187+
assert!(
188+
commit_info_action.get("commitInfo").is_some(),
189+
"First action should be commitInfo"
190+
);
191+
let commit_info = commit_info_action.get("commitInfo").unwrap();
192+
assert!(
193+
commit_info.get("timestamp").is_some(),
194+
"CommitInfo should have timestamp"
195+
);
196+
assert!(
197+
commit_info.get("engineInfo").is_some(),
198+
"CommitInfo should have engineInfo"
199+
);
200+
assert!(
201+
commit_info.get("operation").is_some(),
202+
"CommitInfo should have operation"
203+
);
204+
assert_eq!(
205+
commit_info["operation"], "CREATE TABLE",
206+
"Operation should be CREATE TABLE"
207+
);
208+
209+
// Verify Protocol action
210+
let protocol_action = &actions[1];
211+
assert!(
212+
protocol_action.get("protocol").is_some(),
213+
"Second action should be protocol"
214+
);
215+
let protocol = protocol_action.get("protocol").unwrap();
216+
assert_eq!(
217+
protocol["minReaderVersion"],
218+
TABLE_FEATURES_MIN_READER_VERSION
219+
);
220+
assert_eq!(
221+
protocol["minWriterVersion"],
222+
TABLE_FEATURES_MIN_WRITER_VERSION
223+
);
224+
225+
// Verify Metadata action
226+
let metadata_action = &actions[2];
227+
assert!(
228+
metadata_action.get("metaData").is_some(),
229+
"Third action should be metaData"
230+
);
231+
let metadata = metadata_action.get("metaData").unwrap();
232+
assert!(metadata.get("id").is_some(), "Metadata should have id");
233+
assert!(
234+
metadata.get("schemaString").is_some(),
235+
"Metadata should have schemaString"
236+
);
237+
assert!(
238+
metadata.get("createdTime").is_some(),
239+
"Metadata should have createdTime"
240+
);
241+
242+
// Additional CommitInfo verification (commit_info was already extracted from actions[0] above)
243+
assert_eq!(
244+
commit_info["engineInfo"], engine_info,
245+
"CommitInfo should contain the engine info we provided"
246+
);
247+
248+
assert!(
249+
commit_info.get("txnId").is_some(),
250+
"CommitInfo should have txnId"
251+
);
252+
253+
// Verify kernelVersion is present
254+
let kernel_version = commit_info.get("kernelVersion");
255+
assert!(
256+
kernel_version.is_some(),
257+
"CommitInfo should have kernelVersion"
258+
);
259+
assert!(
260+
kernel_version.unwrap().as_str().unwrap().starts_with("v"),
261+
"Kernel version should start with 'v'"
262+
);
263+
264+
Ok(())
265+
}

0 commit comments

Comments
 (0)