Skip to content

Commit 1861faa

Browse files
sanujbasuSanuj Basu
authored andcommitted
test: Add integration tests for CreateTable API
Add comprehensive tests validating the basic create_table() functionality introduced. Test coverage includes: - test_create_simple_table: Verifies basic table creation with a multi-column schema, snapshot version (0), and field preservation - test_create_table_already_exists: Validates error handling when attempting to create a table at an existing path - test_create_table_empty_schema: Ensures empty schema validation fails at build time with appropriate error message - test_create_table_log_actions: Verifies delta log structure with correct action ordering (CommitInfo, Protocol, Metadata) for ICT compliance, and validates action contents including engineInfo, operation type, protocol versions, and kernelVersion
1 parent a6f4c49 commit 1861faa

File tree

1 file changed

+231
-0
lines changed

1 file changed

+231
-0
lines changed

kernel/tests/create_table.rs

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//! Integration tests for the CreateTable API
2+
3+
use std::sync::Arc;
4+
5+
use delta_kernel::committer::FileSystemCommitter;
6+
use delta_kernel::schema::{DataType, StructField, StructType};
7+
use delta_kernel::snapshot::Snapshot;
8+
use delta_kernel::transaction::create_table::create_table;
9+
use delta_kernel::DeltaResult;
10+
use serde_json::Value;
11+
use tempfile::tempdir;
12+
use test_utils::create_default_engine;
13+
14+
#[tokio::test]
15+
async fn test_create_simple_table() -> DeltaResult<()> {
16+
// Setup
17+
let temp_dir = tempdir().expect("Failed to create temp dir");
18+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
19+
20+
let engine =
21+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
22+
23+
// Create schema for an events table
24+
let schema = Arc::new(StructType::try_new(vec![
25+
StructField::new("event_id", DataType::LONG, false),
26+
StructField::new("user_id", DataType::LONG, false),
27+
StructField::new("event_type", DataType::STRING, false),
28+
StructField::new("timestamp", DataType::TIMESTAMP, false),
29+
StructField::new("properties", DataType::STRING, true),
30+
])?);
31+
32+
// Create table using new API
33+
let _result = create_table(&table_path, schema.clone(), "DeltaKernel-RS/0.17.0")
34+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
35+
.commit(engine.as_ref())?;
36+
37+
// Verify table was created
38+
let table_url = delta_kernel::try_parse_uri(&table_path)?;
39+
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
40+
41+
assert_eq!(snapshot.version(), 0);
42+
assert_eq!(snapshot.schema().fields().len(), 5);
43+
44+
// Verify schema field names
45+
let field_names: Vec<_> = snapshot
46+
.schema()
47+
.fields()
48+
.map(|f| f.name().to_string())
49+
.collect();
50+
assert!(field_names.contains(&"event_id".to_string()));
51+
assert!(field_names.contains(&"user_id".to_string()));
52+
assert!(field_names.contains(&"event_type".to_string()));
53+
assert!(field_names.contains(&"timestamp".to_string()));
54+
assert!(field_names.contains(&"properties".to_string()));
55+
56+
Ok(())
57+
}
58+
59+
#[tokio::test]
60+
async fn test_create_table_already_exists() -> DeltaResult<()> {
61+
// Setup
62+
let temp_dir = tempdir().expect("Failed to create temp dir");
63+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
64+
65+
let engine =
66+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
67+
68+
// Create schema for a user profiles table
69+
let schema = Arc::new(StructType::try_new(vec![
70+
StructField::new("user_id", DataType::LONG, false),
71+
StructField::new("username", DataType::STRING, false),
72+
StructField::new("email", DataType::STRING, false),
73+
StructField::new("created_at", DataType::TIMESTAMP, false),
74+
StructField::new("is_active", DataType::BOOLEAN, false),
75+
])?);
76+
77+
// Create table first time
78+
let _result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
79+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
80+
.commit(engine.as_ref())?;
81+
82+
// Try to create again - should fail at build time (table already exists)
83+
let result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
84+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
85+
86+
assert!(result.is_err());
87+
let err = result.unwrap_err();
88+
assert!(err.to_string().contains("already exists"));
89+
90+
Ok(())
91+
}
92+
93+
#[tokio::test]
94+
async fn test_create_table_empty_schema() -> DeltaResult<()> {
95+
// Setup
96+
let temp_dir = tempdir().expect("Failed to create temp dir");
97+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
98+
99+
let engine =
100+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
101+
102+
// Create empty schema
103+
let schema = Arc::new(StructType::try_new(vec![])?);
104+
105+
// Try to create table with empty schema - should fail at build time
106+
let result = create_table(&table_path, schema, "InvalidApp/0.1.0")
107+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
108+
109+
assert!(result.is_err());
110+
let err = result.unwrap_err();
111+
assert!(err.to_string().contains("cannot be empty"));
112+
113+
Ok(())
114+
}
115+
116+
#[tokio::test]
117+
async fn test_create_table_log_actions() -> DeltaResult<()> {
118+
// Setup
119+
let temp_dir = tempdir().expect("Failed to create temp dir");
120+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
121+
122+
let engine =
123+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
124+
125+
// Create schema
126+
let schema = Arc::new(StructType::try_new(vec![
127+
StructField::new("user_id", DataType::LONG, false),
128+
StructField::new("action", DataType::STRING, false),
129+
])?);
130+
131+
let engine_info = "AuditService/2.1.0";
132+
133+
// Create table
134+
let _ = create_table(&table_path, schema, engine_info)
135+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
136+
.commit(engine.as_ref())?;
137+
138+
// Read the actual Delta log file
139+
let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path);
140+
let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file");
141+
142+
// Parse each line (each line is a separate JSON action)
143+
let actions: Vec<Value> = log_contents
144+
.lines()
145+
.map(|line| serde_json::from_str(line).expect("Failed to parse JSON"))
146+
.collect();
147+
148+
// Verify we have exactly 3 actions: CommitInfo, Protocol, Metadata
149+
// CommitInfo is first to comply with ICT (In-Commit Timestamps) protocol requirements
150+
assert_eq!(
151+
actions.len(),
152+
3,
153+
"Expected 3 actions (commitInfo, protocol, metaData), found {}",
154+
actions.len()
155+
);
156+
157+
// Verify CommitInfo action (first for ICT compliance)
158+
let commit_info_action = &actions[0];
159+
assert!(
160+
commit_info_action.get("commitInfo").is_some(),
161+
"First action should be commitInfo"
162+
);
163+
let commit_info = commit_info_action.get("commitInfo").unwrap();
164+
assert!(
165+
commit_info.get("timestamp").is_some(),
166+
"CommitInfo should have timestamp"
167+
);
168+
assert!(
169+
commit_info.get("engineInfo").is_some(),
170+
"CommitInfo should have engineInfo"
171+
);
172+
assert!(
173+
commit_info.get("operation").is_some(),
174+
"CommitInfo should have operation"
175+
);
176+
assert_eq!(
177+
commit_info["operation"], "CREATE TABLE",
178+
"Operation should be CREATE TABLE"
179+
);
180+
181+
// Verify Protocol action
182+
let protocol_action = &actions[1];
183+
assert!(
184+
protocol_action.get("protocol").is_some(),
185+
"Second action should be protocol"
186+
);
187+
let protocol = protocol_action.get("protocol").unwrap();
188+
assert_eq!(protocol["minReaderVersion"], 3);
189+
assert_eq!(protocol["minWriterVersion"], 7);
190+
191+
// Verify Metadata action
192+
let metadata_action = &actions[2];
193+
assert!(
194+
metadata_action.get("metaData").is_some(),
195+
"Third action should be metaData"
196+
);
197+
let metadata = metadata_action.get("metaData").unwrap();
198+
assert!(metadata.get("id").is_some(), "Metadata should have id");
199+
assert!(
200+
metadata.get("schemaString").is_some(),
201+
"Metadata should have schemaString"
202+
);
203+
assert!(
204+
metadata.get("createdTime").is_some(),
205+
"Metadata should have createdTime"
206+
);
207+
208+
// Additional CommitInfo verification (commit_info was already extracted from actions[0] above)
209+
assert_eq!(
210+
commit_info["engineInfo"], engine_info,
211+
"CommitInfo should contain the engine info we provided"
212+
);
213+
214+
assert!(
215+
commit_info.get("txnId").is_some(),
216+
"CommitInfo should have txnId"
217+
);
218+
219+
// Verify kernelVersion is present
220+
let kernel_version = commit_info.get("kernelVersion");
221+
assert!(
222+
kernel_version.is_some(),
223+
"CommitInfo should have kernelVersion"
224+
);
225+
assert!(
226+
kernel_version.unwrap().as_str().unwrap().starts_with("v"),
227+
"Kernel version should start with 'v'"
228+
);
229+
230+
Ok(())
231+
}

0 commit comments

Comments
 (0)