diff --git a/dm/pkg/checker/table_structure.go b/dm/pkg/checker/table_structure.go index bf758d1c40d..4d199327722 100644 --- a/dm/pkg/checker/table_structure.go +++ b/dm/pkg/checker/table_structure.go @@ -95,6 +95,7 @@ func NewTablesChecker(dbs map[string]*sql.DB, tableMap map[string][]*filter.Tabl tableMap: tableMap, dumpThreads: dumpThreads, } + log.L().Logger.Debug("check table structure", zap.Int("channel pool size", dumpThreads)) c.inCh = make(chan *checkItem, dumpThreads) c.optCh = make(chan *incompatibilityOption, dumpThreads) return c @@ -120,10 +121,12 @@ func (c *TablesChecker) Check(ctx context.Context) *Result { return c.checkTable(checkCtx) }) } - - dispatchTableItem(checkCtx, c.tableMap, c.inCh) + // start consuming results before dispatching + // or the dispatching thread could be blocked when + // the output channel is full. c.wg.Add(1) go c.handleOpts(ctx, r) + dispatchTableItem(checkCtx, c.tableMap, c.inCh) if err := eg.Wait(); err != nil { c.reMu.Lock() markCheckError(r, err) @@ -188,6 +191,7 @@ func (c *TablesChecker) checkTable(ctx context.Context) error { return nil } table := checkItem.table + log.L().Logger.Debug("checking table", zap.String("db", table.Schema), zap.String("table", table.Name)) if len(sourceID) == 0 || sourceID != checkItem.sourceID { sourceID = checkItem.sourceID p, err = dbutil.GetParserForDB(ctx, c.dbs[sourceID]) @@ -214,6 +218,7 @@ func (c *TablesChecker) checkTable(ctx context.Context) error { opt.tableID = table.String() c.optCh <- opt } + log.L().Logger.Debug("finish checking table", zap.String("db", table.Schema), zap.String("table", table.Name)) } } } @@ -456,7 +461,9 @@ func (c *ShardingTablesChecker) checkShardingTable(ctx context.Context, r *Resul r.Extra = fmt.Sprintf("error on sharding %s", c.targetTableID) r.Instruction = "please set same table structure for sharding tables" c.reMu.Unlock() - return nil + // shouldn't return error + // it's feasible to check more sharding tables and + // able to inform users of as many as possible incompatible tables } } } diff --git a/dm/pkg/checker/table_structure_test.go b/dm/pkg/checker/table_structure_test.go index 8626b6c340b..efa6348501a 100644 --- a/dm/pkg/checker/table_structure_test.go +++ b/dm/pkg/checker/table_structure_test.go @@ -94,6 +94,45 @@ func (t *testCheckSuite) TestShardingTablesChecker(c *tc.C) { c.Assert(result.State, tc.Equals, StateFailure) c.Assert(result.Errors, tc.HasLen, 1) c.Assert(mock.ExpectationsWereMet(), tc.IsNil) + + // 4. test tiflow#5759 + checker = NewShardingTablesChecker("test-name", + map[string]*sql.DB{"test-source": db}, + map[string][]*filter.Table{"test-source": { + {Schema: "test-db", Name: "test-table-1"}, + {Schema: "test-db", Name: "test-table-2"}, + {Schema: "test-db", Name: "test-table-3"}, + {Schema: "test-db", Name: "test-table-4"}, + }}, + false, + 1) + mock = initShardingMock(mock) + createTableRow2 = sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-2", `CREATE TABLE "test-table-2" ( + "c" varchar(20) NOT NULL, + PRIMARY KEY ("c") +) ENGINE=InnoDB DEFAULT CHARSET=latin1`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-2`").WillReturnRows(createTableRow2) + createTableRow3 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-3", `CREATE TABLE "test-table-3" ( + "c" varchar(20) NOT NULL, + "c2" INT, + PRIMARY KEY ("c") +) ENGINE=InnoDB DEFAULT CHARSET=latin1`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-3`").WillReturnRows(createTableRow3) + createTableRow4 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-4", `CREATE TABLE "test-table-4" ( + "c" varchar(20) NOT NULL, + "c2" INT, + "c3" INT, + PRIMARY KEY ("c") +) ENGINE=InnoDB DEFAULT CHARSET=latin1`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-4`").WillReturnRows(createTableRow4) + + // in tiflow#5759, this function will enter deadlock + result = checker.Check(ctx) + c.Assert(result.State, tc.Equals, StateFailure) + c.Assert(result.Errors, tc.HasLen, 3) } func (t *testCheckSuite) TestTablesChecker(c *tc.C) { @@ -174,6 +213,41 @@ func (t *testCheckSuite) TestTablesChecker(c *tc.C) { c.Assert(result.State, tc.Equals, StateFailure) c.Assert(result.Errors, tc.HasLen, 1) c.Assert(mock.ExpectationsWereMet(), tc.IsNil) + + // test #5759 + maxConnectionsRow = sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("max_connections", "2") + mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(maxConnectionsRow) + sqlModeRow = sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ANSI_QUOTES") + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlModeRow) + createTableRow1 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-1", `CREATE TABLE "test-table-1" ( + "c" int(11) NOT NULL +) ENGINE=InnoDB`) + createTableRow2 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-2", `CREATE TABLE "test-table-2" ( + "c" int(11) NOT NULL +) ENGINE=InnoDB`) + createTableRow3 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-3", `CREATE TABLE "test-table-3" ( + "c" int(11) NOT NULL +) ENGINE=InnoDB`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-1`").WillReturnRows(createTableRow1) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-2`").WillReturnRows(createTableRow2) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-3`").WillReturnRows(createTableRow3) + + checker = NewTablesChecker( + map[string]*sql.DB{"test-source": db}, + map[string][]*filter.Table{"test-source": { + {Schema: "test-db", Name: "test-table-1"}, + {Schema: "test-db", Name: "test-table-2"}, + {Schema: "test-db", Name: "test-table-3"}, + }}, + 1) + result = checker.Check(ctx) + c.Assert(result.State, tc.Equals, StateFailure) + c.Assert(result.Errors, tc.HasLen, 3) } func (t *testCheckSuite) TestOptimisticShardingTablesChecker(c *tc.C) { diff --git a/dm/tests/check_task/conf/dm-master.toml b/dm/tests/check_task/conf/dm-master.toml new file mode 100644 index 00000000000..2145761ed40 --- /dev/null +++ b/dm/tests/check_task/conf/dm-master.toml @@ -0,0 +1,4 @@ +# Master Configuration. +advertise-addr = "127.0.0.1:8261" +auto-compaction-retention = "3s" +master-addr = ":8261" diff --git a/dm/tests/check_task/conf/dm-worker1.toml b/dm/tests/check_task/conf/dm-worker1.toml new file mode 100644 index 00000000000..3e71a5bdb55 --- /dev/null +++ b/dm/tests/check_task/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +join = "127.0.0.1:8261" +name = "worker1" diff --git a/dm/tests/check_task/conf/source1.yaml b/dm/tests/check_task/conf/source1.yaml new file mode 100644 index 00000000000..fae9f5bd7ee --- /dev/null +++ b/dm/tests/check_task/conf/source1.yaml @@ -0,0 +1,11 @@ +source-id: mysql-replica-01 +flavor: '' +enable-gtid: true +enable-relay: true +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 diff --git a/dm/tests/check_task/conf/task-noshard.yaml b/dm/tests/check_task/conf/task-noshard.yaml new file mode 100644 index 00000000000..e22c8ec54f4 --- /dev/null +++ b/dm/tests/check_task/conf/task-noshard.yaml @@ -0,0 +1,32 @@ +--- +name: test +task-mode: "all" +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + + +block-allow-list: + instance: + do-dbs: ["checktask"] + +mydumpers: + global: + threads: 1 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" \ No newline at end of file diff --git a/dm/tests/check_task/conf/task-sharding.yaml b/dm/tests/check_task/conf/task-sharding.yaml new file mode 100644 index 00000000000..06c547057a2 --- /dev/null +++ b/dm/tests/check_task/conf/task-sharding.yaml @@ -0,0 +1,32 @@ +name: test # 任务名称,需要全局唯一 +task-mode: all # 任务模式,可设为 "full"、"incremental"、"all" +shard-mode: "pessimistic" + +target-database: # 下游数据库实例配置 + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" # 如果密码不为空,则推荐使用经过 dmctl 加密的密文 + +## ******** 功能配置集 ********** +block-allow-list: # 上游数据库实例匹配的表的 block-allow-list 过滤规则集,如果 DM 版本 <= v2.0.0-beta.2 则使用 black-white-list + bw-rule-1: # 黑白名单配置的名称 + do-dbs: ["checktask"] # 迁移哪些库 + +# ----------- 实例配置 ----------- +mysql-instances: + - source-id: "mysql-replica-01" # 上游实例或者复制组 ID,参考 的 配置 + block-allow-list: "bw-rule-1" # 黑白名单配置名称,如果 DM 版本 <= v2.0.0-beta.2 则使用 black-white-list + route-rules: ["rule1"] + mydumper-config-name: "global" + +routes: + rule1: + schema-pattern: "checktask" + table-pattern: "t*" + target-schema: "checktask" + target-table: "t" + +mydumpers: + global: + threads: 1 \ No newline at end of file diff --git a/dm/tests/check_task/data/db1.prepare.sql b/dm/tests/check_task/data/db1.prepare.sql new file mode 100644 index 00000000000..530f652b54a --- /dev/null +++ b/dm/tests/check_task/data/db1.prepare.sql @@ -0,0 +1,12 @@ +use checktask; +create table t1(c int primary key); + +create table t2(c int primary key, c2 int); + +create table t3(c int primary key, c3 int); + +create table t4(c int primary key, c4 int); + +create table t5(c int primary key, c5 int); + +create table t6(c int primary key, c6 int); \ No newline at end of file diff --git a/dm/tests/check_task/run.sh b/dm/tests/check_task/run.sh new file mode 100644 index 00000000000..3d8716852f2 --- /dev/null +++ b/dm/tests/check_task/run.sh @@ -0,0 +1,55 @@ +#!/bin/bash +set -eu +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +TABLE_NUM=20 + +function prepare_incompatible_tables() { + run_sql_both_source "drop database if exists checktask" + run_sql_both_source "create database if not exists checktask" + for i in $(seq $TABLE_NUM); do + run_sql_both_source "create table checktask.test${i}(id int, b varchar(10))" # no primary key + done +} + +function prepare_many_tables() { + run_sql_both_source "drop database if exists checktask" + run_sql_both_source "create database if not exists checktask" + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 +} + +function prepare() { + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1 +} + +function test_check_task_fail_no_block() { + prepare_incompatible_tables + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "check-task $cur/conf/task-noshard.yaml" \ + "\"state\": \"fail\"" 1 +} + +function test_check_task_fail_no_block_forsharding() { + prepare_many_tables + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "check-task $cur/conf/task-sharding.yaml" \ + "\"state\": \"fail\"" 1 +} + +function run() { + prepare + test_check_task_fail_no_block + test_check_task_fail_no_block_forsharding +} + +cleanup_data checktask +cleanup_process $* +run $* +cleanup_process $* +cleanup_data checktask diff --git a/dm/tests/others_integration_1.txt b/dm/tests/others_integration_1.txt index 1832c30f48e..3a0adc9033e 100644 --- a/dm/tests/others_integration_1.txt +++ b/dm/tests/others_integration_1.txt @@ -1,3 +1,4 @@ +check_task validator_basic drop_column_with_index downstream_diff_index