Skip to content

Commit

Permalink
checker(dm): fix deadlock when check-task/start-task (#5772) (#5774)
Browse files Browse the repository at this point in the history
close #5759
  • Loading branch information
lance6716 authored Jun 7, 2022
1 parent 1b1b8d7 commit 9f5e3ce
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 3 deletions.
13 changes: 10 additions & 3 deletions dm/pkg/checker/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func NewTablesChecker(dbs map[string]*sql.DB, tableMap map[string][]*filter.Tabl
tableMap: tableMap,
dumpThreads: dumpThreads,
}
log.L().Logger.Debug("check table structure", zap.Int("channel pool size", dumpThreads))
c.inCh = make(chan *checkItem, dumpThreads)
c.optCh = make(chan *incompatibilityOption, dumpThreads)
return c
Expand All @@ -120,10 +121,12 @@ func (c *TablesChecker) Check(ctx context.Context) *Result {
return c.checkTable(checkCtx)
})
}

dispatchTableItem(checkCtx, c.tableMap, c.inCh)
// start consuming results before dispatching
// or the dispatching thread could be blocked when
// the output channel is full.
c.wg.Add(1)
go c.handleOpts(ctx, r)
dispatchTableItem(checkCtx, c.tableMap, c.inCh)
if err := eg.Wait(); err != nil {
c.reMu.Lock()
markCheckError(r, err)
Expand Down Expand Up @@ -188,6 +191,7 @@ func (c *TablesChecker) checkTable(ctx context.Context) error {
return nil
}
table := checkItem.table
log.L().Logger.Debug("checking table", zap.String("db", table.Schema), zap.String("table", table.Name))
if len(sourceID) == 0 || sourceID != checkItem.sourceID {
sourceID = checkItem.sourceID
p, err = dbutil.GetParserForDB(ctx, c.dbs[sourceID])
Expand All @@ -214,6 +218,7 @@ func (c *TablesChecker) checkTable(ctx context.Context) error {
opt.tableID = table.String()
c.optCh <- opt
}
log.L().Logger.Debug("finish checking table", zap.String("db", table.Schema), zap.String("table", table.Name))
}
}
}
Expand Down Expand Up @@ -456,7 +461,9 @@ func (c *ShardingTablesChecker) checkShardingTable(ctx context.Context, r *Resul
r.Extra = fmt.Sprintf("error on sharding %s", c.targetTableID)
r.Instruction = "please set same table structure for sharding tables"
c.reMu.Unlock()
return nil
// shouldn't return error
// it's feasible to check more sharding tables and
// able to inform users of as many as possible incompatible tables
}
}
}
Expand Down
74 changes: 74 additions & 0 deletions dm/pkg/checker/table_structure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,45 @@ func (t *testCheckSuite) TestShardingTablesChecker(c *tc.C) {
c.Assert(result.State, tc.Equals, StateFailure)
c.Assert(result.Errors, tc.HasLen, 1)
c.Assert(mock.ExpectationsWereMet(), tc.IsNil)

// 4. test tiflow#5759
checker = NewShardingTablesChecker("test-name",
map[string]*sql.DB{"test-source": db},
map[string][]*filter.Table{"test-source": {
{Schema: "test-db", Name: "test-table-1"},
{Schema: "test-db", Name: "test-table-2"},
{Schema: "test-db", Name: "test-table-3"},
{Schema: "test-db", Name: "test-table-4"},
}},
false,
1)
mock = initShardingMock(mock)
createTableRow2 = sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-2", `CREATE TABLE "test-table-2" (
"c" varchar(20) NOT NULL,
PRIMARY KEY ("c")
) ENGINE=InnoDB DEFAULT CHARSET=latin1`)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-2`").WillReturnRows(createTableRow2)
createTableRow3 := sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-3", `CREATE TABLE "test-table-3" (
"c" varchar(20) NOT NULL,
"c2" INT,
PRIMARY KEY ("c")
) ENGINE=InnoDB DEFAULT CHARSET=latin1`)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-3`").WillReturnRows(createTableRow3)
createTableRow4 := sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-4", `CREATE TABLE "test-table-4" (
"c" varchar(20) NOT NULL,
"c2" INT,
"c3" INT,
PRIMARY KEY ("c")
) ENGINE=InnoDB DEFAULT CHARSET=latin1`)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-4`").WillReturnRows(createTableRow4)

// in tiflow#5759, this function will enter deadlock
result = checker.Check(ctx)
c.Assert(result.State, tc.Equals, StateFailure)
c.Assert(result.Errors, tc.HasLen, 3)
}

func (t *testCheckSuite) TestTablesChecker(c *tc.C) {
Expand Down Expand Up @@ -174,6 +213,41 @@ func (t *testCheckSuite) TestTablesChecker(c *tc.C) {
c.Assert(result.State, tc.Equals, StateFailure)
c.Assert(result.Errors, tc.HasLen, 1)
c.Assert(mock.ExpectationsWereMet(), tc.IsNil)

// test #5759
maxConnectionsRow = sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("max_connections", "2")
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(maxConnectionsRow)
sqlModeRow = sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ANSI_QUOTES")
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlModeRow)
createTableRow1 := sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-1", `CREATE TABLE "test-table-1" (
"c" int(11) NOT NULL
) ENGINE=InnoDB`)
createTableRow2 := sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-2", `CREATE TABLE "test-table-2" (
"c" int(11) NOT NULL
) ENGINE=InnoDB`)
createTableRow3 := sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test-table-3", `CREATE TABLE "test-table-3" (
"c" int(11) NOT NULL
) ENGINE=InnoDB`)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-1`").WillReturnRows(createTableRow1)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-2`").WillReturnRows(createTableRow2)
mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-3`").WillReturnRows(createTableRow3)

checker = NewTablesChecker(
map[string]*sql.DB{"test-source": db},
map[string][]*filter.Table{"test-source": {
{Schema: "test-db", Name: "test-table-1"},
{Schema: "test-db", Name: "test-table-2"},
{Schema: "test-db", Name: "test-table-3"},
}},
1)
result = checker.Check(ctx)
c.Assert(result.State, tc.Equals, StateFailure)
c.Assert(result.Errors, tc.HasLen, 3)
}

func (t *testCheckSuite) TestOptimisticShardingTablesChecker(c *tc.C) {
Expand Down
4 changes: 4 additions & 0 deletions dm/tests/check_task/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
master-addr = ":8261"
2 changes: 2 additions & 0 deletions dm/tests/check_task/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
join = "127.0.0.1:8261"
name = "worker1"
11 changes: 11 additions & 0 deletions dm/tests/check_task/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: true
enable-relay: true
relay-binlog-name: ''
relay-binlog-gtid: ''
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
32 changes: 32 additions & 0 deletions dm/tests/check_task/conf/task-noshard.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
name: test
task-mode: "all"
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
clean-dump-file: false

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"


block-allow-list:
instance:
do-dbs: ["checktask"]

mydumpers:
global:
threads: 1
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""
32 changes: 32 additions & 0 deletions dm/tests/check_task/conf/task-sharding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: test # 任务名称,需要全局唯一
task-mode: all # 任务模式,可设为 "full"、"incremental"、"all"
shard-mode: "pessimistic"

target-database: # 下游数据库实例配置
host: "127.0.0.1"
port: 4000
user: "root"
password: "" # 如果密码不为空,则推荐使用经过 dmctl 加密的密文

## ******** 功能配置集 **********
block-allow-list: # 上游数据库实例匹配的表的 block-allow-list 过滤规则集,如果 DM 版本 <= v2.0.0-beta.2 则使用 black-white-list
bw-rule-1: # 黑白名单配置的名称
do-dbs: ["checktask"] # 迁移哪些库

# ----------- 实例配置 -----------
mysql-instances:
- source-id: "mysql-replica-01" # 上游实例或者复制组 ID,参考 的 配置
block-allow-list: "bw-rule-1" # 黑白名单配置名称,如果 DM 版本 <= v2.0.0-beta.2 则使用 black-white-list
route-rules: ["rule1"]
mydumper-config-name: "global"

routes:
rule1:
schema-pattern: "checktask"
table-pattern: "t*"
target-schema: "checktask"
target-table: "t"

mydumpers:
global:
threads: 1
12 changes: 12 additions & 0 deletions dm/tests/check_task/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use checktask;
create table t1(c int primary key);

create table t2(c int primary key, c2 int);

create table t3(c int primary key, c3 int);

create table t4(c int primary key, c4 int);

create table t5(c int primary key, c5 int);

create table t6(c int primary key, c6 int);
55 changes: 55 additions & 0 deletions dm/tests/check_task/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash
set -eu
cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

TABLE_NUM=20

function prepare_incompatible_tables() {
run_sql_both_source "drop database if exists checktask"
run_sql_both_source "create database if not exists checktask"
for i in $(seq $TABLE_NUM); do
run_sql_both_source "create table checktask.test${i}(id int, b varchar(10))" # no primary key
done
}

function prepare_many_tables() {
run_sql_both_source "drop database if exists checktask"
run_sql_both_source "create database if not exists checktask"
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
}

function prepare() {
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1
}

function test_check_task_fail_no_block() {
prepare_incompatible_tables
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"check-task $cur/conf/task-noshard.yaml" \
"\"state\": \"fail\"" 1
}

function test_check_task_fail_no_block_forsharding() {
prepare_many_tables
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"check-task $cur/conf/task-sharding.yaml" \
"\"state\": \"fail\"" 1
}

function run() {
prepare
test_check_task_fail_no_block
test_check_task_fail_no_block_forsharding
}

cleanup_data checktask
cleanup_process $*
run $*
cleanup_process $*
cleanup_data checktask
1 change: 1 addition & 0 deletions dm/tests/others_integration_1.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
check_task
validator_basic
drop_column_with_index
downstream_diff_index
Expand Down

0 comments on commit 9f5e3ce

Please sign in to comment.