diff --git a/.github/ISSUE_TEMPLATE/bug-report.yml b/.github/ISSUE_TEMPLATE/bug-report.yml
index 39f5b87900e..5892a2677fc 100644
--- a/.github/ISSUE_TEMPLATE/bug-report.yml
+++ b/.github/ISSUE_TEMPLATE/bug-report.yml
@@ -90,10 +90,10 @@ body:
- type: textarea
attributes:
- label: Flink or Spark Version
- description: Provide Flink or Spark Version.
+ label: Zeta or Flink or Spark Version
+ description: Provide Zeta or Flink or Spark Version.
placeholder: >
- Please provide the version of Flink or Spark.
+ Please provide the version of Zeta or Flink or Spark.
validations:
required: false
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index f586ac3bcca..506cb7f2485 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -2,11 +2,24 @@
> Doris sink connector
+## Support Those Engines
+
+> Spark
+> Flink
+> SeaTunnel Zeta
+
+## Key Features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
## Description
Used to send data to Doris. Both support streaming and batch mode.
The internal implementation of Doris sink connector is cached and imported by stream load in batches.
+## Supported DataSource Info
+
:::tip
Version Supported
@@ -17,67 +30,186 @@ Version Supported
:::
-## Key features
-
-- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [cdc](../../concept/connector-v2-features.md)
-
-## Options
-
-| name | type | required | default value |
-|--------------------|--------|----------|---------------|
-| fenodes | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| table.identifier | string | yes | - |
-| sink.label-prefix | string | yes | - |
-| sink.enable-2pc | bool | no | true |
-| sink.enable-delete | bool | no | false |
-| doris.config | map | yes | - |
-
-### fenodes [string]
-
-`Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."`
-
-### username [string]
-
-`Doris` user username
-
-### password [string]
-
-`Doris` user password
+## Sink Options
+
+| Name | Type | Required | Default | Description |
+|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` |
+| username | String | Yes | - | `Doris` user username |
+| password | String | Yes | - | `Doris` user password |
+| table.identifier | String | Yes | - | The name of `Doris` table |
+| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. |
+| sink.enable-2pc | bool | No | - | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). |
+| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) |
+| sink.check-interval | int | No | 10000 | check exception with the interval while loading |
+| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
+| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. |
+| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. |
+| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. |
+
+## Data Type Mapping
+
+| Doris Data type | SeaTunnel Data type |
+|-----------------|-----------------------------------------|
+| BOOLEAN | BOOLEAN |
+| TINYINT | TINYINT |
+| SMALLINT | SMALLINT
TINYINT |
+| INT | INT
SMALLINT
TINYINT |
+| BIGINT | BIGINT
INT
SMALLINT
TINYINT |
+| LARGEINT | BIGINT
INT
SMALLINT
TINYINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE
FLOAT |
+| DECIMAL | DECIMAL
DOUBLE
FLOAT |
+| DATE | DATE |
+| DATETIME | TIMESTAMP |
+| CHAR | STRING |
+| VARCHAR | STRING |
+| STRING | STRING |
+| ARRAY | ARRAY |
+| MAP | MAP |
+| JSON | STRING |
+| HLL | Not supported yet |
+| BITMAP | Not supported yet |
+| QUANTILE_STATE | Not supported yet |
+| STRUCT | Not supported yet |
-### table.identifier [string]
-
-The name of `Doris` table
+#### Supported import data formats
-### sink.label-prefix [string]
+The supported formats include CSV and JSON
-The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
+## Task Example
-### sink.enable-2pc [bool]
+### Simple:
-Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
+> The following example describes writing multiple data types to Doris, and users need to create corresponding tables downstream
-### sink.enable-delete [bool]
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
-Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map>"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(16, 1)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
-https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual
+sink {
+ Doris {
+ fenodes = "doris_cdc_e2e:8030"
+ username = root
+ password = ""
+ table.identifier = "test.e2e_table_sink"
+ sink.label-prefix = "test-cdc"
+ sink.enable-2pc = "true"
+ sink.enable-delete = "true"
+ doris.config {
+ format = "json"
+ read_json_by_line = "true"
+ }
+ }
+}
+```
-### doris.config [map]
+### CDC(Change Data Capture) Event:
-The parameter of the stream load `data_desc`, you can get more detail at this link:
+> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Doris Sink,FakeSource simulates CDC data with schema, score (int type),Doris needs to create a table sink named test.e2e_table_sink and a corresponding table for it.
-https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
-#### Supported import data formats
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ sex = boolean
+ number = tinyint
+ height = float
+ sight = double
+ create_time = date
+ update_time = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
-The supported formats include CSV and JSON. Default value: CSV
+sink {
+ Doris {
+ fenodes = "doris_cdc_e2e:8030"
+ username = root
+ password = ""
+ table.identifier = "test.e2e_table_sink"
+ sink.label-prefix = "test-cdc"
+ sink.enable-2pc = "true"
+ sink.enable-delete = "true"
+ doris.config {
+ format = "json"
+ read_json_by_line = "true"
+ }
+ }
+}
-## Example
+```
-Use JSON format to import data
+### Use JSON format to import data
```
sink {
@@ -97,7 +229,7 @@ sink {
```
-Use CSV format to import data
+### Use CSV format to import data
```
sink {
diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md
index cb3b154d58b..b70d34608ea 100644
--- a/docs/en/connector-v2/source/Hudi.md
+++ b/docs/en/connector-v2/source/Hudi.md
@@ -2,69 +2,67 @@
> Hudi source connector
-## Description
+## Support Those Engines
-Used to read data from Hudi. Currently, only supports hudi cow table and Snapshot Query with Batch Mode.
+> Spark
+> Flink
+> SeaTunnel Zeta
-In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.
-
-## Key features
+## Key Features
- [x] [batch](../../concept/connector-v2-features.md)
-
-Currently, only supports hudi cow table and Snapshot Query with Batch Mode
-
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-## Options
-
-| name | type | required | default value |
-|-------------------------|---------|------------------------------|---------------|
-| table.path | string | yes | - |
-| table.type | string | yes | - |
-| conf.files | string | yes | - |
-| use.kerberos | boolean | no | false |
-| kerberos.principal | string | yes when use.kerberos = true | - |
-| kerberos.principal.file | string | yes when use.kerberos = true | - |
-| common-options | config | no | - |
-
-### table.path [string]
-
-`table.path` The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'.
+## Description
-### table.type [string]
+Used to read data from Hudi. Currently, only supports hudi cow table and Snapshot Query with Batch Mode.
-`table.type` The type of hudi table. Now we only support 'cow', 'mor' is not support yet.
+In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.
-### conf.files [string]
+## Supported DataSource Info
-`conf.files` The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'.
+:::tip
-### use.kerberos [boolean]
+* Currently, only supports Hudi cow table and Snapshot Query with Batch Mode
-`use.kerberos` Whether to enable Kerberos, default is false.
+:::
-### kerberos.principal [string]
+## Data Type Mapping
-`kerberos.principal` When use kerberos, we should set kerberos princal such as 'test_user@xxx'.
+| Hudi Data type | Seatunnel Data type |
+|----------------|---------------------|
+| ALL TYPE | STRING |
-### kerberos.principal.file [string]
+## Source Options
-`kerberos.principal.file` When use kerberos, we should set kerberos princal file such as '/home/test/test_user.keytab'.
+| Name | Type | Required | Default | Description |
+|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| table.path | String | Yes | - | The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. |
+| table.type | String | Yes | - | The type of hudi table. Now we only support 'cow', 'mor' is not support yet. |
+| conf.files | String | Yes | - | The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. |
+| use.kerberos | bool | No | false | Whether to enable Kerberos, default is false. |
+| kerberos.principal | String | yes when use.kerberos = true | - | When use kerberos, we should set kerberos principal such as 'test_user@xxx'. |
+| kerberos.principal.file | string | yes when use.kerberos = true | - | When use kerberos, we should set kerberos principal file such as '/home/test/test_user.keytab'. |
+| common-options | config | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
-### common options
+## Task Example
-Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+### Simple:
-## Examples
+> This example reads from a Hudi COW table and configures Kerberos for the environment, printing to the console.
```hocon
-source {
-
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+source{
Hudi {
table.path = "hdfs://nameserivce/data/hudi/hudi_table/"
table.type = "cow"
@@ -73,7 +71,15 @@ source {
kerberos.principal = "test_user@xxx"
kerberos.principal.file = "/home/test/test_user.keytab"
}
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform-v2/sql/
+}
+sink {
+ Console {}
}
```
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 7a09ac6bc4e..97c0c523e63 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -223,14 +223,11 @@ public void close() {
private boolean isChangeRecordInChunkRange(SourceRecord record) {
if (taskContext.isDataChangeRecord(record)) {
+ // fix the between condition
return taskContext.isRecordBetween(
record,
- null == currentSnapshotSplit.getSplitStart()
- ? null
- : new Object[] {currentSnapshotSplit.getSplitStart()},
- null == currentSnapshotSplit.getSplitEnd()
- ? null
- : new Object[] {currentSnapshotSplit.getSplitEnd()});
+ currentSnapshotSplit.getSplitStart(),
+ currentSnapshotSplit.getSplitEnd());
}
return false;
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 5257064dc1f..2b8e9f7725f 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -32,8 +33,12 @@
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -49,6 +54,8 @@
public class IncrementalSourceStreamFetcher implements Fetcher {
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
+ // has entered pure binlog mode
+ private final Set pureBinlogPhaseTables;
private volatile ChangeEventQueue queue;
private volatile Throwable readException;
@@ -58,6 +65,11 @@ public class IncrementalSourceStreamFetcher implements Fetcher maxSplitHighWatermarkMap;
+ // finished spilt info
+ private Map> finishedSplitsInfo;
+
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTaskId) {
@@ -65,6 +77,7 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
+ this.pureBinlogPhaseTables = new HashSet<>();
}
@Override
@@ -157,14 +170,72 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
tableId);
return position.isAfter(splitStartWatermark);
}
- // TODO only the table who captured snapshot splits need to filter( Used to support
- // Exactly-Once )
- return position.isAfter(splitStartWatermark);
+ // check whether the pure binlog mode has been entered
+ if (hasEnterPureBinlogPhase(tableId, position)) {
+ return true;
+ }
+ // not enter pure binlog mode and need to check whether the current record meets the
+ // emitting conditions.
+ if (finishedSplitsInfo.containsKey(tableId)) {
+ for (CompletedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
+ if (taskContext.isRecordBetween(
+ sourceRecord,
+ splitInfo.getSplitStart(),
+ splitInfo.getSplitEnd())
+ && position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
+ return true;
+ }
+ }
+ }
+ return false;
}
return true;
}
+ private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
+ // only the table who captured snapshot splits need to filter
+ if (pureBinlogPhaseTables.contains(tableId)) {
+ return true;
+ }
+ // the existed tables those have finished snapshot reading
+ if (maxSplitHighWatermarkMap.containsKey(tableId)
+ && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
+ pureBinlogPhaseTables.add(tableId);
+ return true;
+ }
+ return false;
+ }
+
private void configureFilter() {
splitStartWatermark = currentIncrementalSplit.getStartupOffset();
+ Map> splitsInfoMap = new HashMap<>();
+ Map tableIdBinlogPositionMap = new HashMap<>();
+ List completedSnapshotSplitInfos =
+ currentIncrementalSplit.getCompletedSnapshotSplitInfos();
+
+ // latest-offset mode
+ if (completedSnapshotSplitInfos.isEmpty()) {
+ for (TableId tableId : currentIncrementalSplit.getTableIds()) {
+ tableIdBinlogPositionMap.put(tableId, currentIncrementalSplit.getStartupOffset());
+ }
+ }
+
+ // calculate the max high watermark of every table
+ for (CompletedSnapshotSplitInfo finishedSplitInfo : completedSnapshotSplitInfos) {
+ TableId tableId = finishedSplitInfo.getTableId();
+ List list =
+ splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
+ list.add(finishedSplitInfo);
+ splitsInfoMap.put(tableId, list);
+
+ Offset highWatermark = finishedSplitInfo.getWatermark().getHighWatermark();
+ Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
+ if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) {
+ tableIdBinlogPositionMap.put(tableId, highWatermark);
+ }
+ }
+ this.finishedSplitsInfo = splitsInfoMap;
+ this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
+ this.pureBinlogPhaseTables.clear();
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 7e64ee81ef7..396fd7bae9d 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -65,7 +65,10 @@ public OptionRule optionRule() {
JdbcSourceOptions.SERVER_TIME_ZONE,
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
- JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ JdbcSourceOptions.CONNECTION_POOL_SIZE,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
index 195b1a5a7c6..0d91c02fee7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
@@ -176,7 +176,13 @@ public int compareTo(Offset offset) {
// compared ...
long timestamp = this.getTimestamp();
long targetTimestamp = that.getTimestamp();
- return Long.compare(timestamp, targetTimestamp);
+ // Timestamps are presupposes that they exist,
+ // because timestamps do not exist for low watermark and high watermark.
+ // If not judging here results in the really binlog offset comparison to watermark
+ // always being true.
+ if (timestamp != 0 && targetTimestamp != 0) {
+ return Long.compare(timestamp, targetTimestamp);
+ }
}
// First compare the MySQL binlog filenames
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 85810b825f3..285d4b79232 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -65,7 +65,10 @@ public OptionRule optionRule() {
JdbcSourceOptions.SERVER_TIME_ZONE,
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
- JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ JdbcSourceOptions.CONNECTION_POOL_SIZE,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
.optional(SqlServerSourceOptions.STARTUP_MODE, SqlServerSourceOptions.STOP_MODE)
.conditional(
SqlServerSourceOptions.STARTUP_MODE,