From 63372e32948707a65bd92f7c5217b9af00a5354c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 6 Mar 2023 20:55:11 +0800 Subject: [PATCH] sink(ticdc): convert values of pre-columns properly (#8421) (#8438) close pingcap/tiflow#8420 --- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 9 +- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 129 +++++++++++------- .../charset_gbk/data/test.sql | 38 +++++- 3 files changed, 116 insertions(+), 60 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 5c6db285e8c..72f9f411c6d 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -260,15 +260,15 @@ func convert2RowChanges( return res } -func convertBinaryToString(row *model.RowChangedEvent) { - for i, col := range row.Columns { +func convertBinaryToString(cols []*model.Column) { + for i, col := range cols { if col == nil { continue } if col.Charset != "" && col.Charset != charset.CharsetBin { colValBytes, ok := col.Value.([]byte) if ok { - row.Columns[i].Value = string(colValBytes) + cols[i].Value = string(colValBytes) } } } @@ -289,7 +289,8 @@ func (s *mysqlBackend) groupRowsByType( deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) for _, row := range event.Event.Rows { - convertBinaryToString(row) + convertBinaryToString(row.Columns) + convertBinaryToString(row.PreColumns) if row.IsInsert() { insertRow = append( diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 3c6b06a104f..259606e95a5 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" @@ -1222,10 +1223,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 1, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 1, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("你好"), }}, IndexColumns: [][]int{{1, 2}}, }, @@ -1239,10 +1242,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 2, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 2, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("世界"), }}, IndexColumns: [][]int{{1, 2}}, }, @@ -1250,7 +1255,7 @@ func TestPrepareBatchDMLs(t *testing.T) { expected: &preparedDMLs{ startTs: []model.Ts{418658114257813514}, sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"}, - values: [][]interface{}{{1, 1, 2, 2}}, + values: [][]interface{}{{1, "你好", 2, "世界"}}, rowCount: 2, }, }, @@ -1266,10 +1271,11 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, Value: 1, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 1, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: "你好", }}, IndexColumns: [][]int{{1, 1}}, }, @@ -1283,10 +1289,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag, Value: 2, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag, - Value: 2, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: "世界", }}, IndexColumns: [][]int{{2, 2}}, }, @@ -1294,7 +1302,7 @@ func TestPrepareBatchDMLs(t *testing.T) { expected: &preparedDMLs{ startTs: []model.Ts{418658114257813516}, sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"}, - values: [][]interface{}{{1, 1, 2, 2}}, + values: [][]interface{}{{1, "你好", 2, "世界"}}, rowCount: 2, }, }, @@ -1311,10 +1319,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 1, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 1, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("开发"), }}, Columns: []*model.Column{nil, { Name: "a1", @@ -1322,10 +1332,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 2, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 2, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("测试"), }}, IndexColumns: [][]int{{1, 2}}, }, @@ -1339,10 +1351,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 3, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 3, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("纽约"), }}, Columns: []*model.Column{nil, { Name: "a1", @@ -1350,18 +1364,27 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 4, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 4, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("北京"), }}, IndexColumns: [][]int{{1, 2}}, }, }, expected: &preparedDMLs{ - startTs: []model.Ts{418658114257813516}, - sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"}, - values: [][]interface{}{{1, 1, 2, 3, 3, 4, 1, 1, 2, 3, 3, 4, 1, 1, 3, 3}}, + startTs: []model.Ts{418658114257813516}, + sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE " + + "WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) " + + "THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) " + + "THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE " + + "ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"}, + values: [][]interface{}{{ + 1, "开发", 2, 3, "纽约", 4, 1, "开发", "测试", 3, + "纽约", "北京", 1, "开发", 3, "纽约", + }}, rowCount: 2, }, }, @@ -1378,10 +1401,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 2, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 2, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("你好"), }}, IndexColumns: [][]int{{1, 2}}, @@ -1396,10 +1421,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 1, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 1, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("世界"), }}, IndexColumns: [][]int{{1, 2}}, }, @@ -1413,10 +1440,12 @@ func TestPrepareBatchDMLs(t *testing.T) { Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, Value: 2, }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, - Value: 2, + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: "你好", }}, IndexColumns: [][]int{{1, 2}}, }, @@ -1427,7 +1456,7 @@ func TestPrepareBatchDMLs(t *testing.T) { "DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", "INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)", }, - values: [][]interface{}{{1, 1, 2, 2}, {2, 2}}, + values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}}, rowCount: 3, }, }, diff --git a/tests/integration_tests/charset_gbk/data/test.sql b/tests/integration_tests/charset_gbk/data/test.sql index 56cd43102b3..68f7d9d8ed5 100644 --- a/tests/integration_tests/charset_gbk/data/test.sql +++ b/tests/integration_tests/charset_gbk/data/test.sql @@ -4,7 +4,7 @@ CREATE DATABASE `charset_gbk_test0` CHARACTER SET utf8mb4; USE `charset_gbk_test0`; -/* this is a test for columns which charset is gbk*/ +/* this is a test for columns which charset is gbk, with pk*/ CREATE TABLE t0 ( id INT, name varchar(128) CHARACTER SET gbk, @@ -30,8 +30,34 @@ WHERE name = '测试'; DELETE FROM t0 WHERE name = '部署'; -/* this is a test for table which charset is gbk*/ +/* this is a test for table which charset is gbk, without pk but with uk */ CREATE TABLE t1 ( + id INT NOT NULL, + name varchar(128) CHARACTER SET gbk NOT NULL, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob, + UNIQUE KEY (id, name) +) ENGINE = InnoDB CHARSET = utf8mb4; + +INSERT INTO t1 +VALUES (1, '测试', "中国", "上海", "你好,世界" + , 0xC4E3BAC3CAC0BDE7); + +INSERT INTO t1 +VALUES (2, '部署', "美国", "纽约", "世界,你好" + , 0xCAC0BDE7C4E3BAC3); + +UPDATE t1 +SET name = '开发' +WHERE name = '测试'; + +DELETE FROM t1 +WHERE name = '部署'; + +/* this is a test for table which charset is gbk*/ +CREATE TABLE t2 ( id INT, name varchar(128), country char(32), @@ -41,19 +67,19 @@ CREATE TABLE t1 ( PRIMARY KEY (id) ) ENGINE = InnoDB CHARSET = gbk; -INSERT INTO t1 +INSERT INTO t2 VALUES (1, '测试', "中国", "上海", "你好,世界" , 0xC4E3BAC3CAC0BDE7); -INSERT INTO t1 +INSERT INTO t2 VALUES (2, '部署', "美国", "纽约", "世界,你好" , 0xCAC0BDE7C4E3BAC3); -UPDATE t1 +UPDATE t2 SET name = '开发' WHERE name = '测试'; -DELETE FROM t1 +DELETE FROM t2 WHERE name = '部署'; /* this is a test for db which charset is gbk*/