Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
fix consistency bug (#128)
Browse files Browse the repository at this point in the history
* fix consistency problem

* add consistency integration test

* address comment

* address comments
  • Loading branch information
lichunzhu authored Aug 7, 2020
1 parent 4c8f99c commit 6f74c68
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 59 deletions.
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitHash=$(shell git rev-parse
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)"
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GoVersion=$(shell go version)"

FAILPOINT_ENABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl enable)
FAILPOINT_DISABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl disable)

GO = go
GOLDFLAGS = -ldflags '$(LDFLAGS)'
ifeq ("$(WITH_RACE)", "1")
Expand All @@ -17,8 +20,21 @@ build: bin/dumpling
bin/%: cmd/%/main.go $(wildcard v4/**/*.go)
$(GO) build $(GOLDFLAGS) -tags codes -o $@ $<

test:
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic
test: failpoint-enable
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic ||{ $(FAILPOINT_DISABLE); exit 1; }
@make failpoint-disable

integration_test: failpoint-enable bin/dumpling
@make failpoint-disable
./tests/run.sh ||{ $(FAILPOINT_DISABLE); exit 1; }

bin/failpoint-ctl: go.mod
$(GO) build -o $@ github.com/pingcap/failpoint/failpoint-ctl

failpoint-enable: bin/failpoint-ctl
# Converting gofail failpoints...
@$(FAILPOINT_ENABLE)

integration_test: bin/dumpling
./tests/run.sh
failpoint-disable: bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/pd/v4 v4.0.0
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.5.1 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.14.0
golang.org/x/mod v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,12 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0hl9LcYo6cZoGBGiLtefMQMF/vo3XLgQ=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
Expand Down Expand Up @@ -270,6 +273,7 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
60 changes: 60 additions & 0 deletions tests/consistency/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/sh

set -eu
cur=$(cd `dirname $0`; pwd)

DB_NAME="mysql_consistency"
TABLE_NAME="t"

# drop database on mysql
run_sql "drop database if exists \`$DB_NAME\`;"

# build data on mysql
run_sql "create database $DB_NAME;"
run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));"

# insert 100 records
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

# dumping with consistency flush
export DUMPLING_TEST_DATABASE=$DB_NAME
export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=1*sleep(5000)"
run_dumpling &
# wait dumpling process to start to sleep
sleep 2

# record metadata info
metadata=`run_sql "show master status;"`
metaLog=`echo $metadata | awk -F 'File:' '{print $2}' | awk '{print $1}'`
metaPos=`echo $metadata | awk -F 'Position:' '{print $2}' | awk '{print $1}'`
metaGTID=`echo $metadata | awk -F 'Executed_Gtid_Set:' '{print $2}' | awk '{print $1}'`
# insert 100 more records, test whether dumpling will dump these data out
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

wait

# check data record count
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "1st records count is ${cnt}"
[ $cnt = 100 ]

# check metadata
echo "metaLog: $metaLog"
echo "metaPos: $metaPos"
echo "metaGTID: $metaGTID"
if [ $metaLog != "" ]; then
[ `grep -o "Log: $metaLog" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaPos != "" ]; then
[ `grep -o "Pos: $metaPos" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaGTID != "" ]; then
[ `grep -o "GTID: $metaGTID" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi

# test dumpling normally
export GO_FAILPOINTS=""
run_dumpling
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "2nd records count is ${cnt}"
[ $cnt = 200 ]
49 changes: 24 additions & 25 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package export

import (
"context"
"database/sql"
"errors"
"fmt"
)

func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyController, error) {
func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
resolveAutoConsistency(conf)
conn, err := session.Conn(ctx)
if err != nil {
return nil, err
}
switch conf.Consistency {
case "flush":
return &ConsistencyFlushTableWithReadLock{
serverType: conf.ServerInfo.ServerType,
db: session,
conn: conn,
}, nil
case "lock":
return &ConsistencyLockDumpingTables{
db: session,
conn: conn,
allTables: conf.Tables,
}, nil
case "snapshot":
Expand All @@ -32,49 +37,46 @@ func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyControl
}

type ConsistencyController interface {
Setup() error
TearDown() error
Setup(context.Context) error
TearDown(context.Context) error
}

type ConsistencyNone struct{}

func (c *ConsistencyNone) Setup() error {
func (c *ConsistencyNone) Setup(_ context.Context) error {
return nil
}

func (c *ConsistencyNone) TearDown() error {
func (c *ConsistencyNone) TearDown(_ context.Context) error {
return nil
}

type ConsistencyFlushTableWithReadLock struct {
serverType ServerType
db *sql.DB
conn *sql.Conn
}

func (c *ConsistencyFlushTableWithReadLock) Setup() error {
func (c *ConsistencyFlushTableWithReadLock) Setup(ctx context.Context) error {
if c.serverType == ServerTypeTiDB {
return withStack(errors.New("'flush table with read lock' cannot be used to ensure the consistency in TiDB"))
}
return FlushTableWithReadLock(c.db)
return FlushTableWithReadLock(ctx, c.conn)
}

func (c *ConsistencyFlushTableWithReadLock) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyFlushTableWithReadLock lost database connection"))
}
return UnlockTables(c.db)
func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error {
defer c.conn.Close()
return UnlockTables(ctx, c.conn)
}

type ConsistencyLockDumpingTables struct {
db *sql.DB
conn *sql.Conn
allTables DatabaseTables
}

func (c *ConsistencyLockDumpingTables) Setup() error {
func (c *ConsistencyLockDumpingTables) Setup(ctx context.Context) error {
for dbName, tables := range c.allTables {
for _, table := range tables {
err := LockTables(c.db, dbName, table.Name)
err := LockTables(ctx, c.conn, dbName, table.Name)
if err != nil {
return err
}
Expand All @@ -83,12 +85,9 @@ func (c *ConsistencyLockDumpingTables) Setup() error {
return nil
}

func (c *ConsistencyLockDumpingTables) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyLockDumpingTables lost database connection"))
}
return UnlockTables(c.db)
func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error {
defer c.conn.Close()
return UnlockTables(ctx, c.conn)
}

const showMasterStatusFieldNum = 5
Expand Down
39 changes: 22 additions & 17 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package export

import (
"context"
"errors"
"strings"

Expand All @@ -18,41 +19,43 @@ func (s *testConsistencySuite) assertNil(err error, c *C) {
}
}

func (s *testConsistencySuite) assertLifetimeErrNil(ctrl ConsistencyController, c *C) {
s.assertNil(ctrl.Setup(), c)
s.assertNil(ctrl.TearDown(), c)
func (s *testConsistencySuite) assertLifetimeErrNil(ctx context.Context, ctrl ConsistencyController, c *C) {
s.assertNil(ctrl.Setup(ctx), c)
s.assertNil(ctrl.TearDown(ctx), c)
}

func (s *testConsistencySuite) TestConsistencyController(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = "none"
ctrl, _ := NewConsistencyController(conf, db)
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "flush"
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyFlushTableWithReadLock)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)
if err = mock.ExpectationsWereMet(); err != nil {
c.Fatalf(err.Error())
}

conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().
Expand All @@ -62,10 +65,10 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk)
}
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyLockDumpingTables)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)
if err = mock.ExpectationsWereMet(); err != nil {
c.Fatalf(err.Error())
}
Expand Down Expand Up @@ -96,31 +99,33 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()

conf.Consistency = "invalid_str"
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue)

// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeUnknown
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ := NewConsistencyController(conf, db)
err = ctrl.Setup()
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)

// lock table fail
conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().AppendTables("db", "t")
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(conf, db)
err = ctrl.Setup()
ctrl, _ = NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)
}
Loading

0 comments on commit 6f74c68

Please sign in to comment.