Skip to content

Commit

Permalink
mounter(ticdc): fix decoding when upstream doesn't enable new collati…
Browse files Browse the repository at this point in the history
…on (#11370) (#11379)

close #11371
  • Loading branch information
ti-chi-bot authored Jul 3, 2024
1 parent 29aac52 commit 498e3d3
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 2 deletions.
10 changes: 10 additions & 0 deletions cmd/cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ package main

import (
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tiflow/pkg/cmd"
)

func main() {
// When the upstream doesn't enable new collation and there is a table with cluster index,
// tidb will not encode the pk column in the value part.
// So we will rely on the function `tablecodec.DecodeHandleToDatumMap` to decode pk value from the key.
// But this function only works when the global variable `newCollationEnabled` in tidb package is set to false.
//
// Previouly, this global variable is set to false in tidb package,
// but it was removed as described in https://github.com/pingcap/tidb/pull/52191#issuecomment-2024836481.
// So we need to manully set it to false here.
collate.SetNewCollationEnabledForTest(false)
cmd.Run()
}
2 changes: 1 addition & 1 deletion tests/integration_tests/bdr_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/bdr_mode/sync_diff/output"

source-instances = ["tidb"]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ci_collation_compatibility/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ci_collation_compatibility.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
new_collations_enabled_on_first_bootstrap = false
10 changes: 10 additions & 0 deletions tests/integration_tests/ci_collation_compatibility/data/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
drop database if exists `ci_collation_compatibility`;
create database `ci_collation_compatibility`;
use `ci_collation_compatibility`;

CREATE TABLE t (
a varchar(20) not null PRIMARY KEY CLUSTERED,
b int default 10
);

insert into t values ('hello', 1);
61 changes: 61 additions & 0 deletions tests/integration_tests/ci_collation_compatibility/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function run() {
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR --tidb-config $CUR/conf/tidb_config.toml

cd $WORK_DIR

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-ci-collation-compatibility-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:[email protected]:3306/?safe-mode=true" ;;
esac

run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE ci_collation_compatibility.finish_mark_1 (a int primary key);"

sleep 30
check_table_exists "ci_collation_compatibility.t" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "ci_collation_compatibility.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ groups=(
# G14
'changefeed_finish force_replicate_table'
# G15
'new_ci_collation batch_add_table multi_rocks'
'new_ci_collation batch_add_table multi_rocks ci_collation_compatibility'
# G16, currently G16 is not running in kafka pipeline
'owner_resign processor_etcd_worker_delay sink_hang'
# G17
Expand Down

0 comments on commit 498e3d3

Please sign in to comment.