Skip to content

Commit

Permalink
[Fix][Connecotr-V2] Fix clickhouse sink does not support composite pr…
Browse files Browse the repository at this point in the history
…imary key (#8021)
  • Loading branch information
eyys authored Nov 14, 2024
1 parent 8e32228 commit 24d0542
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,17 @@ public TableSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInf
String[] primaryKeys = null;
if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
String primaryKey = readonlyConfig.get(PRIMARY_KEY);
if (primaryKey == null || primaryKey.trim().isEmpty()) {
throw new ClickhouseConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"primary_key can not be empty");
}
if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
throw new ClickhouseConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
}
primaryKeys = new String[] {primaryKey};
primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
}
boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
boolean allowExperimentalLightweightDelete =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer
dropSinkTable();
}

@TestTemplate
public void testClickhouseCompositePrimary(TestContainer container) throws Exception {
initializeClickhouseCompositePrimary();

Container.ExecResult execResult = container.executeJob("/fake_to_clickhouse.conf");
Assertions.assertEquals(0, execResult.getExitCode());

checkSinkTableRows();
dropSinkTable();
}

private void initConnection() throws Exception {
final Properties info = new Properties();
info.put("user", this.container.getUsername());
Expand All @@ -170,6 +181,23 @@ private void initializeClickhouseMergeTreeTable() {
}
}

private void initializeClickhouseCompositePrimary() {
try {
Statement statement = this.connection.createStatement();
String sql =
String.format(
"create table if not exists %s.%s(\n"
+ " `pk_id` Int64,\n"
+ " `name` String,\n"
+ " `score` Int32\n"
+ ")engine=MergeTree ORDER BY(pk_id, name) PRIMARY KEY(pk_id, name)",
DATABASE, SINK_TABLE);
statement.execute(sql);
} catch (SQLException e) {
throw new RuntimeException("Initializing Clickhouse table failed!", e);
}
}

private void initializeClickhouseReplacingMergeTreeTable() {
try {
Statement statement = this.connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [2, "B", 100]
}
]
}
}

sink {
Clickhouse {
host = "clickhouse:8123"
database = "default"
table = "sink_table"
username = "default"
password = ""

primary_key = "pk_id, name"
support_upsert = true
allow_experimental_lightweight_delete = true
}
}

0 comments on commit 24d0542

Please sign in to comment.