Skip to content

Commit

Permalink
sink (ticdc): disable batch update dml when downstream is mysql (#8452)…
Browse files Browse the repository at this point in the history
… (#8460)

close #8420
  • Loading branch information
ti-chi-bot authored Mar 7, 2023
1 parent 63372e3 commit 7f56161
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
21 changes: 17 additions & 4 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,23 @@ func (s *mysqlBackend) batchSingleTxnDmls(

// handle update
if len(updateRows) > 0 {
for _, rows := range updateRows {
s, v := s.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
if s.cfg.IsTiDB {
for _, rows := range updateRows {
s, v := s.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
}
// The behavior of update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
} else {
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}
}

Expand Down
101 changes: 100 additions & 1 deletion cdc/sinkv2/eventsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,19 +1199,22 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
func TestPrepareBatchDMLs(t *testing.T) {
t.Parallel()
testCases := []struct {
isTiDB bool
input []*model.RowChangedEvent
expected *preparedDMLs
}{
// empty event
{
input: []*model.RowChangedEvent{},
isTiDB: true,
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{
startTs: []model.Ts{},
sqls: []string{},
values: [][]interface{}{},
},
},
{ // delete event
isTiDB: false,
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
Expand Down Expand Up @@ -1260,6 +1263,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
},
},
{ // insert event
isTiDB: true,
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813516,
Expand Down Expand Up @@ -1308,6 +1312,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
},
// update event
{
isTiDB: true,
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813516,
Expand Down Expand Up @@ -1390,6 +1395,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
},
// mixed event
{
isTiDB: true,
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
Expand Down Expand Up @@ -1460,6 +1466,98 @@ func TestPrepareBatchDMLs(t *testing.T) {
rowCount: 3,
},
},
// update event and downstream is mysql and without pk
{
isTiDB: false,
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag |
model.MultipleKeyFlag |
model.HandleKeyFlag |
model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("开发"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag |
model.MultipleKeyFlag |
model.HandleKeyFlag |
model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("测试"),
}},
IndexColumns: [][]int{{1, 2}},
},
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag |
model.MultipleKeyFlag |
model.HandleKeyFlag |
model.UniqueKeyFlag,
Value: 3,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("纽约"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag |
model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("北京"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, " +
"`a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1",
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, " +
"`a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1",
},
values: [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
rowCount: 2,
},
},
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -1469,6 +1567,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
ms.cfg.SafeMode = false
ms.cfg.EnableOldValue = true
for _, tc := range testCases {
ms.cfg.IsTiDB = tc.isTiDB
ms.events = make([]*eventsink.TxnCallbackableEvent, 1)
ms.events[0] = &eventsink.TxnCallbackableEvent{
Event: &model.SingleTableTxn{Rows: tc.input},
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/charset_gbk/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ INSERT INTO t0
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

SELECT sleep(5);

UPDATE t0
SET name = '开发'
WHERE name = '测试'
Expand Down

0 comments on commit 7f56161

Please sign in to comment.