Skip to content

Commit

Permalink
fix: reading single latest version in cdf
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent 6630851 commit bb42225
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 36 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum DeltaTableError {
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },

#[error("Invalid version start version {start} is greater than version {end}")]
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ pub async fn get_latest_version(
// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we use current_version -1, and do one more try.
if empty_stream {
let obj_meta = object_store.head(&commit_uri_from_version(max_version)).await;
let obj_meta = object_store
.head(&commit_uri_from_version(max_version))
.await;
if obj_meta.is_err() {
return Box::pin(get_latest_version(log_store, -1)).await;
}
Expand All @@ -480,7 +482,6 @@ pub async fn get_latest_version(
Ok(version)
}


/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CdfLoadBuilder {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start >= latest_version {
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Expand Down Expand Up @@ -671,10 +671,10 @@ pub(crate) mod tests {
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::InvalidVersion { .. }
));
assert!(table
.unwrap_err()
.to_string()
.contains("Invalid version. Start version 5 is greater than end version 4"));

Ok(())
}
Expand Down
37 changes: 29 additions & 8 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,12 @@ mod tests {

#[tokio::test]
async fn test_buffer_len_includes_unflushed_row_group() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -515,9 +518,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_no_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -528,9 +534,12 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_single_partition() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand Down Expand Up @@ -613,9 +622,11 @@ mod tests {

#[tokio::test]
async fn test_divide_record_batch_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

let partitions = writer.divide_by_partition_values(&batch).unwrap();
Expand All @@ -631,9 +642,11 @@ mod tests {

#[tokio::test]
async fn test_write_no_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand All @@ -643,9 +656,11 @@ mod tests {

#[tokio::test]
async fn test_write_multiple_partitions() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string(), "id".to_string()];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
Expand Down Expand Up @@ -714,9 +729,12 @@ mod tests {

#[tokio::test]
async fn test_write_mismatched_schema() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let table = create_initialized_table(&partition_cols).await;
let table = create_initialized_table(table_path, &partition_cols).await;
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

// Write the first batch with the first schema to the table
Expand Down Expand Up @@ -895,9 +913,12 @@ mod tests {

#[tokio::test]
async fn test_schema_evolution_column_type_mismatch() {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path().to_str().unwrap();

let batch = get_record_batch(None, false);
let partition_cols = vec![];
let mut table = create_initialized_table(&partition_cols).await;
let mut table = create_initialized_table(table_path, &partition_cols).await;

let mut writer = RecordBatchWriter::for_table(&table).unwrap();

Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,10 @@ pub fn create_bare_table() -> DeltaTable {
.unwrap()
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
pub async fn create_initialized_table(table_path: &str, partition_cols: &[String]) -> DeltaTable {
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_location(table_path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
Expand Down
30 changes: 20 additions & 10 deletions crates/core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct Context {
pub table: DeltaTable,
}

async fn setup_test() -> Result<Context, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<Context, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_string(),
Expand All @@ -34,9 +34,6 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
true,
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let table = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -95,7 +92,10 @@ fn get_record_batch() -> RecordBatch {

#[tokio::test]
async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let context = setup_test(table_uri).await?;
let table = context.table;
let result = DeltaOps(table).restore().with_version_to_restore(1).await?;
assert_eq!(result.1.num_restored_file, 1);
Expand All @@ -118,7 +118,9 @@ async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let version = 1;

Expand All @@ -142,7 +144,9 @@ async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let history = table.history(Some(10)).await?;
let timestamp = history.get(1).unwrap().timestamp.unwrap();
Expand All @@ -166,7 +170,9 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
Expand All @@ -193,7 +199,9 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;

for file in context.table.snapshot()?.log_data() {
let p = context.tmp_dir.path().join(file.path().as_ref());
Expand Down Expand Up @@ -221,7 +229,9 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {

#[tokio::test]
async fn test_restore_transaction_conflict() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let mut table = context.table;
table.load_version(2).await?;

Expand Down
10 changes: 6 additions & 4 deletions crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,11 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {
}

mod date_partitions {
use tempfile::TempDir;

use super::*;

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
async fn setup_test(table_uri: &str) -> Result<DeltaTable, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_owned(),
Expand All @@ -1200,8 +1202,6 @@ mod date_partitions {
),
];

let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
Expand Down Expand Up @@ -1238,7 +1238,9 @@ mod date_partitions {
#[tokio::test]
async fn test_issue_1445_date_partition() -> Result<()> {
let ctx = SessionContext::new();
let mut dt = setup_test().await.unwrap();
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let mut dt = setup_test(table_uri).await.unwrap();
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
Expand Down
36 changes: 35 additions & 1 deletion python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ def test_read_cdf_version_out_of_range():
with pytest.raises(DeltaError) as e:
dt.load_cdf(4).read_all().to_pydict()

assert "invalid table version" in str(e).lower()
assert (
"invalid version. start version 4 is greater than end version 3"
in str(e).lower()
)


def test_read_cdf_version_out_of_range_with_flag():
Expand All @@ -714,3 +717,34 @@ def test_read_timestamp_cdf_out_of_range_with_flag():
b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all()

assert len(b) == 0


def test_read_cdf_last_version(tmp_path):
data = pa.Table.from_pydict({"foo": [1, 2, 3]})

expected = pa.Table.from_pydict(
{
"foo": [1, 2, 3],
"_change_type": ["insert", "insert", "insert"],
"_commit_version": [0, 0, 0],
}
)

write_deltalake(
tmp_path,
data=data,
configuration={"delta.enableChangeDataFeed": "true"},
)

data = (
DeltaTable(tmp_path)
.load_cdf(
starting_version=0,
ending_version=0,
allow_out_of_range=False,
columns=["foo", "_change_type", "_commit_version"],
)
.read_all()
)

assert expected == data

0 comments on commit bb42225

Please sign in to comment.