Skip to content

Commit 6d99bb2

Browse files
authored
Merge pull request #295 from brokercap/v2.3.11
V2.3.11
2 parents e8dabf2 + 62ba2d9 commit 6d99bb2

File tree

8 files changed

+257
-205
lines changed

8 files changed

+257
-205
lines changed

README.MD

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ make install prefix=./target
124124
##### 二进制文件安装
125125
`````sh
126126

127-
wget https://github.com/brokercap/Bifrost/releases/download/v2.3.6-beta/bifrost_v2.3.6-beta_Linux-amd64-bin.tar.gz
127+
wget https://github.com/brokercap/Bifrost/releases/download/v2.3.11-beta/bifrost_v2.3.11-beta_Linux-amd64-bin.tar.gz
128128

129-
tar -zxvf bifrost_v2.3.6-beta_Linux-amd64-bin.tar.gz
129+
tar -zxvf bifrost_v2.3.11-beta_Linux-amd64-bin.tar.gz
130130

131-
cd bifrost_v2.3.6-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
131+
cd bifrost_v2.3.11-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
132132

133133
`````
134134

changelog.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
v2.3.11-beta 2025-03-07
2+
1. plugin mysql 修复获取表元数据的时候连接异常导致的连接错误没被识别的bug
3+
2. kafka plugin 消息统计不再使用计数器,而是直接使用切片的长度
4+
3. 修复开启了FilterUpdate,但是源表中存在复杂的json类型字段匹配错误,导致的异常退出bug
5+
6+
v2.3.10-beta 2025-01-19
7+
1. fix mongo batch _id contains ' " ' bug
8+
修改mongo源全量同步到目标库的时候,_id 字段包含双引号的bug
9+
2. Update the logic for determining whether it is Starrocks, supporting specifying it as Starrocks in the URI and forcing it to be set as Starrocks
10+
支持uri连配配置中设置starrocks字样,会被识别为starrocks数据库
11+
112
v2.3.8-beta 2024-11-05
213
1. fix mysql select tinyint(1) , value > 1 , transfer to bool ,result false bug
314

config/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ limitations under the License.
1616

1717
package config
1818

19-
const VERSION = "v2.3.8-beta"
19+
const VERSION = "v2.3.11-beta"

input/mongo/batch.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
outputDriver "github.com/brokercap/Bifrost/plugin/driver"
66
"go.mongodb.org/mongo-driver/bson"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
78
"go.mongodb.org/mongo-driver/mongo"
89
"go.mongodb.org/mongo-driver/mongo/options"
910
"log"
@@ -86,14 +87,17 @@ func (c *MongoInput) TableBatchStart(collection *mongo.Collection, perBatchLimit
8687
if len(batchResult) == 0 {
8788
break
8889
}
90+
nextMinId = batchResult[len(batchResult)-1]["_id"]
8991
for _, batchInfo := range batchResult {
92+
if docId, ok := batchInfo["_id"].(primitive.ObjectID); ok {
93+
batchInfo["_id"] = docId.Hex()
94+
}
9095
eventData := c.BatchResult2RowEvent(schemaName, tableName, batchInfo)
9196
c.callback(eventData)
9297
}
9398
if len(batchResult) < perBatchLimit {
9499
break
95100
}
96-
nextMinId = batchResult[len(batchResult)-1]["_id"]
97101
}
98102
return nil
99103
}

plugin/kafka/src/kafka.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ type PluginParam struct {
5757

5858
dataList []*sarama.ProducerMessage
5959
commitBinlogList []*pluginDriver.PluginDataType
60-
dataCurrentCount int
6160
}
6261

6362
func NewConn() pluginDriver.Driver {
@@ -151,7 +150,6 @@ func (This *Conn) GetParam(p interface{}) (interface{}, error) {
151150
if len(param.dataList) == 0 {
152151
param.dataList = make([]*sarama.ProducerMessage, 0)
153152
param.commitBinlogList = make([]*pluginDriver.PluginDataType, 0)
154-
param.dataCurrentCount = 0
155153
}
156154
This.p = param
157155
return param, nil
@@ -267,9 +265,8 @@ func (This *Conn) sendToList(data *pluginDriver.PluginDataType, retry bool, isCo
267265
This.p.commitBinlogList[n0] = data
268266
}
269267
}
270-
This.p.dataCurrentCount++
271268
}
272-
if This.p.dataCurrentCount >= This.p.BatchSize {
269+
if len(This.p.dataList) >= This.p.BatchSize {
273270
LastSuccessCommitData, err = This.sendToKafkaByBatch()
274271
}
275272
} else {
@@ -314,17 +311,16 @@ func (This *Conn) sendToKafkaByBatch() (*pluginDriver.PluginDataType, error) {
314311
return nil, This.err
315312
}
316313
}
317-
if This.p.dataCurrentCount == 0 {
314+
if len(This.p.dataList) == 0 {
318315
return nil, nil
319316
}
320317
var err error
321318
var binlogEvent *pluginDriver.PluginDataType
322-
if This.p.dataCurrentCount > This.p.BatchSize {
319+
if len(This.p.dataList) > This.p.BatchSize {
323320
list := This.p.dataList[:This.p.BatchSize]
324321
err = This.producer.SendMessages(list)
325322
if err == nil {
326323
This.p.dataList = This.p.dataList[This.p.BatchSize:]
327-
This.p.dataCurrentCount -= This.p.BatchSize
328324
if len(This.p.commitBinlogList) > 0 {
329325
binlogEvent = This.p.commitBinlogList[0]
330326
This.p.commitBinlogList = This.p.commitBinlogList[1:]
@@ -334,7 +330,6 @@ func (This *Conn) sendToKafkaByBatch() (*pluginDriver.PluginDataType, error) {
334330
err = This.producer.SendMessages(This.p.dataList)
335331
if err == nil {
336332
This.p.dataList = make([]*sarama.ProducerMessage, 0)
337-
This.p.dataCurrentCount = 0
338333
if len(This.p.commitBinlogList) > 0 {
339334
binlogEvent = This.p.commitBinlogList[0]
340335
This.p.commitBinlogList = This.p.commitBinlogList[1:]

plugin/mysql/src/schema.go

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ func (This *mysqlDB) Close() bool {
4747
}
4848

4949
func (This *mysqlDB) GetSchemaList() (data []string) {
50-
rows, err := This.conn.Query("SHOW DATABASES", []driver.Value{})
51-
if err != nil {
52-
log.Printf("[ERROR] output[%s] GetSchemaList err:%+v \n", OutputName, err)
53-
This.err = err
50+
var rows driver.Rows
51+
rows, This.err = This.conn.Query("SHOW DATABASES", []driver.Value{})
52+
if This.err != nil {
53+
log.Printf("[ERROR] output[%s] GetSchemaList err:%+v \n", OutputName, This.err)
5454
return
5555
}
5656
defer rows.Close()
@@ -120,13 +120,14 @@ func (This *mysqlDB) GetTableFields(schema, table string) (data []TableStruct) {
120120
p := make([]driver.Value, 0)
121121
p = append(p, schema)
122122
p = append(p, table)
123-
rows, err := This.conn.Query(sql, p)
124-
if err != nil {
123+
var rows driver.Rows
124+
rows, This.err = This.conn.Query(sql, p)
125+
if This.err != nil {
125126
return
126127
}
127128
defer rows.Close()
128-
if err != nil {
129-
log.Printf("[ERROR] output[%s] GetTableFields schema:%s table:%s err:%+v \n", OutputName, schema, table, err)
129+
if This.err != nil {
130+
log.Printf("[ERROR] output[%s] GetTableFields schema:%s table:%s err:%+v \n", OutputName, schema, table, This.err)
130131
return FieldList
131132
}
132133
for {
@@ -191,26 +192,27 @@ func (This *mysqlDB) GetTableFields(schema, table string) (data []TableStruct) {
191192
}
192193

193194
func (This *mysqlDB) Begin() error {
194-
_, err := This.conn.Exec("BEGIN", make([]driver.Value, 0))
195-
return err
195+
_, This.err = This.conn.Exec("BEGIN", make([]driver.Value, 0))
196+
return This.err
196197
}
197198

198199
func (This *mysqlDB) Commit() error {
199-
_, err := This.conn.Exec("COMMIT", make([]driver.Value, 0))
200-
return err
200+
_, This.err = This.conn.Exec("COMMIT", make([]driver.Value, 0))
201+
return This.err
201202
}
202203

203204
func (This *mysqlDB) Rollback() error {
204-
_, err := This.conn.Exec("ROLLBACK", make([]driver.Value, 0))
205-
return err
205+
_, This.err = This.conn.Exec("ROLLBACK", make([]driver.Value, 0))
206+
return This.err
206207
}
207208

208209
func (This *mysqlDB) ShowTableCreate(schema, table string) string {
209210
sql := "SHOW CREATE TABLE `" + schema + "`.`" + table + "`"
210211
p := make([]driver.Value, 0)
211-
rows, err := This.conn.Query(sql, p)
212-
if err != nil {
213-
log.Printf("[ERROR] output[%s] ShowTableCreate schema:%s table:%s err:%+v \n", OutputName, schema, table, err)
212+
var rows driver.Rows
213+
rows, This.err = This.conn.Query(sql, p)
214+
if This.err != nil {
215+
log.Printf("[ERROR] output[%s] ShowTableCreate schema:%s table:%s err:%+v \n", OutputName, schema, table, This.err)
214216
return ""
215217
}
216218
defer rows.Close()
@@ -231,9 +233,10 @@ func (This *mysqlDB) ShowTableCreate(schema, table string) string {
231233
func (This *mysqlDB) SelectVersion() string {
232234
sql := "SELECT version()"
233235
p := make([]driver.Value, 0)
234-
rows, err := This.conn.Query(sql, p)
235-
if err != nil {
236-
log.Printf("[ERROR] output[%s] SelectVersion err:%+v \n", OutputName, err)
236+
var rows driver.Rows
237+
rows, This.err = This.conn.Query(sql, p)
238+
if This.err != nil {
239+
log.Printf("[ERROR] output[%s] SelectVersion err:%+v \n", OutputName, This.err)
237240
return ""
238241
}
239242
defer rows.Close()
@@ -253,22 +256,23 @@ func (This *mysqlDB) SelectVersion() string {
253256

254257
func (This *mysqlDB) CreateDatabase(database string) (err error) {
255258
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", database)
256-
_, err = This.conn.Exec(sql, []driver.Value{})
257-
return
259+
_, This.err = This.conn.Exec(sql, []driver.Value{})
260+
return This.err
258261
}
259262

260263
func (This *mysqlDB) Exec(sql string) (err error) {
261-
_, err = This.conn.Exec(sql, []driver.Value{})
262-
return
264+
_, This.err = This.conn.Exec(sql, []driver.Value{})
265+
return This.err
263266
}
264267

265268
func (This *mysqlDB) ShowBackends() (backendsList []map[string]driver.Value, err error) {
266269
sql := "SHOW backends"
267270
p := make([]driver.Value, 0)
268-
rows, err := This.conn.Query(sql, p)
269-
if err != nil {
270-
log.Printf("[WARN] output[%s] ShowBackbends err:%+v \n", OutputName, err)
271-
return make([]map[string]driver.Value, 0), err
271+
var rows driver.Rows
272+
rows, This.err = This.conn.Query(sql, p)
273+
if This.err != nil {
274+
log.Printf("[WARN] output[%s] ShowBackbends err:%+v \n", OutputName, This.err)
275+
return make([]map[string]driver.Value, 0), This.err
272276
}
273277
defer rows.Close()
274278
for {
@@ -285,3 +289,23 @@ func (This *mysqlDB) ShowBackends() (backendsList []map[string]driver.Value, err
285289
}
286290
return backendsList, nil
287291
}
292+
293+
func (This *mysqlDB) ShowVersionComment() (versionComment string, err error) {
294+
sql := "SHOW VARIABLES LIKE 'version_comment'"
295+
p := make([]driver.Value, 0)
296+
rows, err := This.conn.Query(sql, p)
297+
if err != nil {
298+
log.Printf("[WARN] output[%s] ShowVersionComment err:%+v \n", OutputName, err)
299+
return "", err
300+
}
301+
defer rows.Close()
302+
for {
303+
dest := make([]driver.Value, len(rows.Columns()), len(rows.Columns()))
304+
err := rows.Next(dest)
305+
if err != nil {
306+
break
307+
}
308+
versionComment = fmt.Sprint(dest[1])
309+
}
310+
return
311+
}

plugin/mysql/src/starrocks.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
pluginDriver "github.com/brokercap/Bifrost/plugin/driver"
77
"log"
88
"runtime/debug"
9+
"strings"
910
)
1011

1112
func (This *Conn) IsStarRocks() bool {
@@ -17,6 +18,11 @@ func (This *Conn) GetStarRocksBeCount() int {
1718
}
1819

1920
func (This *Conn) initIsStarrock() {
21+
tmpUri := strings.ToLower(*This.uri)
22+
if strings.Contains(tmpUri, "starrocks") || strings.Contains(tmpUri, "doris") {
23+
This.isStarRocks = true
24+
This.starRocksBeCount = 1
25+
}
2026
defer func() {
2127
if err := recover(); err != nil {
2228
log.Printf("[ERROR] output[%s] initIsStarrock recover:%+v \n", OutputName, string(debug.Stack()))
@@ -26,16 +32,27 @@ func (This *Conn) initIsStarrock() {
2632
if This.conn == nil {
2733
return
2834
}
29-
backendsList, _ := This.conn.ShowBackends()
30-
if len(backendsList) == 0 {
31-
return
35+
if !This.isStarRocks {
36+
versionComment, err := This.conn.ShowVersionComment()
37+
if err != nil {
38+
return
39+
}
40+
if !strings.Contains(strings.ToLower(versionComment), "mysql") {
41+
This.isStarRocks = true
42+
This.starRocksBeCount = 1
43+
}
3244
}
33-
// starrocks show backends 列表中,存在 BePort 这个字段,代表 Be 节点的端口
34-
if _, ok := backendsList[0]["BePort"]; !ok {
35-
return
45+
if This.isStarRocks {
46+
backendsList, _ := This.conn.ShowBackends()
47+
if len(backendsList) == 0 {
48+
return
49+
}
50+
// starrocks show backends 列表中,存在 BePort 这个字段,代表 Be 节点的端口
51+
if _, ok := backendsList[0]["BePort"]; !ok {
52+
return
53+
}
54+
This.starRocksBeCount = len(backendsList)
3655
}
37-
This.isStarRocks = true
38-
This.starRocksBeCount = len(backendsList)
3956
}
4057

4158
func (This *Conn) StarRocksDelete(SchemaName, TableName string, pks []string, pksWhereList [][]string) error {

0 commit comments

Comments
 (0)