Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ public static Map<String, String> validateIncrementalReadParams(Map<String, Stri
if (hasStartSnapshotId) {
try {
long startSId = Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
if (startSId <= 0) {
throw new UserException("startSnapshotId must be greater than 0");
if (startSId < 0) {
throw new UserException("startSnapshotId must be greater than or equal to 0");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid startSnapshotId format: " + e.getMessage());
Expand All @@ -565,8 +565,8 @@ public static Map<String, String> validateIncrementalReadParams(Map<String, Stri
if (hasEndSnapshotId) {
try {
long endSId = Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
if (endSId <= 0) {
throw new UserException("endSnapshotId must be greater than 0");
if (endSId < 0) {
throw new UserException("endSnapshotId must be greater than or equal to 0");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid endSnapshotId format: " + e.getMessage());
Expand All @@ -578,8 +578,8 @@ public static Map<String, String> validateIncrementalReadParams(Map<String, Stri
try {
long startSId = Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
long endSId = Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
if (startSId >= endSId) {
throw new UserException("startSnapshotId must be less than endSnapshotId");
if (startSId > endSId) {
throw new UserException("startSnapshotId must be less than or equal to endSnapshotId");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid snapshot ID format: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ public void testValidateIncrementalReadParams() throws UserException {
// Test valid parameter combinations

// 1. Only startSnapshotId
Map<String, String> params = new HashMap<>();
params.put("startSnapshotId", "5");
Map<String, String> params1 = new HashMap<>();
params1.put("startSnapshotId", "5");
ExceptionChecker.expectThrowsWithMsg(UserException.class,
"endSnapshotId is required when using snapshot-based incremental read",
() -> PaimonScanNode.validateIncrementalReadParams(params));
() -> PaimonScanNode.validateIncrementalReadParams(params1));

// 2. Both startSnapshotId and endSnapshotId
params.clear();
Map<String, String> params = new HashMap<>();
params.put("startSnapshotId", "1");
params.put("endSnapshotId", "5");
Map<String, String> result = PaimonScanNode.validateIncrementalReadParams(params);
Expand Down Expand Up @@ -226,57 +226,47 @@ public void testValidateIncrementalReadParams() throws UserException {
e.getMessage().contains("startSnapshotId is required when using snapshot-based incremental read"));
}

// 11. Test invalid snapshot ID values (≤ 0)
params.clear();
params.put("startSnapshotId", "0");
try {
PaimonScanNode.validateIncrementalReadParams(params);
Assert.fail("Should throw exception for startSnapshotId ≤ 0");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be greater than 0"));
}

// 11. Test invalid snapshot ID values < 0)
params.clear();
params.put("startSnapshotId", "-1");
try {
PaimonScanNode.validateIncrementalReadParams(params);
Assert.fail("Should throw exception for negative startSnapshotId");
Assert.fail("Should throw exception for startSnapshotId < 0");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be greater than 0"));
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be greater than or equal to 0"));
}

params.clear();
params.put("startSnapshotId", "1");
params.put("endSnapshotId", "0");
params.put("endSnapshotId", "-1");
try {
PaimonScanNode.validateIncrementalReadParams(params);
Assert.fail("Should throw exception for endSnapshotId 0");
Assert.fail("Should throw exception for endSnapshotId < 0");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("endSnapshotId must be greater than 0"));
Assert.assertTrue(e.getMessage().contains("endSnapshotId must be greater than or equal to 0"));
}

// 12. Test start end for snapshot IDs
// 12. Test start > end for snapshot IDs
params.clear();
params.put("startSnapshotId", "5");
params.put("startSnapshotId", "6");
params.put("endSnapshotId", "5");
try {
PaimonScanNode.validateIncrementalReadParams(params);
Assert.fail("Should throw exception when startSnapshotId = endSnapshotId");
Assert.fail("Should throw exception when startSnapshotId > endSnapshotId");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be less than endSnapshotId"));
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be less than or equal to endSnapshotId"));
}

// 12.1. Test startSnapshotId == endSnapshotId (should be allowed, consistent with Spark Paimon behavior)
params.clear();
params.put("startSnapshotId", "6");
params.put("startSnapshotId", "5");
params.put("endSnapshotId", "5");
try {
PaimonScanNode.validateIncrementalReadParams(params);
Assert.fail("Should throw exception when startSnapshotId > endSnapshotId");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("startSnapshotId must be less than endSnapshotId"));
}
result = PaimonScanNode.validateIncrementalReadParams(params);
Assert.assertEquals("5,5", result.get("incremental-between"));
Assert.assertTrue(result.containsKey("scan.mode") && result.get("scan.mode") == null);
Assert.assertEquals(3, result.size());

// 13. Test invalid timestamp values ( 0)
// 13. Test invalid timestamp values (< 0)
params.clear();
params.put("startTimestamp", "-1");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@

-- !scan_mode4 --

-- !snapshot_id_0_0_empty --

-- !snapshot_id_0_1 --
1 Alice 30

-- !snapshot_id_1_1_empty --

-- !cte --
Bob 25
Charlie 28
Expand Down Expand Up @@ -73,6 +80,13 @@ Alice 30

-- !scan_mode4 --

-- !snapshot_id_0_0_empty --

-- !snapshot_id_0_1 --
1 Alice 30

-- !snapshot_id_1_1_empty --

-- !cte --
Bob 25
Charlie 28
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ suite("test_paimon_incr_read", "p0,external,doris,external_docker,external_docke
order_qt_scan_mode2 """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'diff');"""
order_qt_scan_mode3 """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'delta');"""
order_qt_scan_mode4 """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'changelog');"""


order_qt_snapshot_id_0_0_empty """select * from paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=0)"""
order_qt_snapshot_id_0_1 """select * from paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=1)"""
order_qt_snapshot_id_1_1_empty """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=1)"""

// complex query
qt_cte """with cte1 as (select * from paimon_incr@incr('startTimestamp'=0)) select name, age from cte1 order by age;"""
Expand Down Expand Up @@ -84,10 +87,6 @@ suite("test_paimon_incr_read", "p0,external,doris,external_docker,external_docke
sql """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'error');"""
exception "incrementalBetweenScanMode must be one of"
}
test {
sql """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=1)"""
exception "startSnapshotId must be less than endSnapshotId"
}
test {
sql """select * from paimon_incr@incr('startSnapshotId'=1)"""
exception "endSnapshotId is required when using snapshot-based incremental read"
Expand All @@ -96,6 +95,10 @@ suite("test_paimon_incr_read", "p0,external,doris,external_docker,external_docke
sql """select * from paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2) for version as of 1"""
exception "should not spec both snapshot and scan params"
}
test {
sql """select * from paimon_incr@incr('startSnapshotId'=-1)"""
exception "startSnapshotId must be greater than or equal to 0"
}
}

test_incr_read("false")
Expand Down