Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
[release-5.0] dump: add split big tidb query to several small queries (
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Apr 1, 2021
1 parent c40db64 commit 463e4cc
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 16 deletions.
110 changes: 96 additions & 14 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskC
}

for _, table := range tables {
d.L().Debug("start dumping table...", zap.String("database", dbName),
tctx.L().Debug("start dumping table...", zap.String("database", dbName),
zap.String("table", table.Name))
meta, err := dumpTableMeta(conf, metaConn, dbName, table)
if err != nil {
Expand Down Expand Up @@ -331,10 +331,71 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *sql.Conn, meta Tabl
return d.concurrentDumpTable(tctx, conn, meta, taskChan)
}

func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (*TaskTableData, error) {
tableChan := make(chan Task, 128)
errCh := make(chan error, 1)
go func() {
// adjust rows to suitable rows for this table
d.conf.Rows = GetSuitableRows(tctx, conn, meta.DatabaseName(), meta.TableName())
err := d.concurrentDumpTable(tctx, conn, meta, tableChan)
d.conf.Rows = UnspecifiedSize
if err != nil {
errCh <- err
} else {
close(errCh)
}
}()
tableDataArr := make([]*tableData, 0)
handleSubTask := func(task Task) {
tableTask, ok := task.(*TaskTableData)
if !ok {
tctx.L().Warn("unexpected task when splitting table chunks", zap.String("task", tableTask.Brief()))
return
}
tableDataInst, ok := tableTask.Data.(*tableData)
if !ok {
tctx.L().Warn("unexpected task.Data when splitting table chunks", zap.String("task", tableTask.Brief()))
return
}
tableDataArr = append(tableDataArr, tableDataInst)
}
for {
select {
case err, ok := <-errCh:
if !ok {
// make sure all the subtasks in tableChan are handled
for len(tableChan) > 0 {
task := <-tableChan
handleSubTask(task)
}
if len(tableDataArr) <= 1 {
return nil, nil
}
queries := make([]string, 0, len(tableDataArr))
colLen := tableDataArr[0].colLen
for _, tableDataInst := range tableDataArr {
queries = append(queries, tableDataInst.query)
if colLen != tableDataInst.colLen {
tctx.L().Warn("colLen varies for same table",
zap.Int("oldColLen", colLen),
zap.String("oldQuery", queries[0]),
zap.Int("newColLen", tableDataInst.colLen),
zap.String("newQuery", tableDataInst.query))
return nil, nil
}
}
return NewTaskTableData(meta, newMultiQueriesChunk(queries, colLen), 0, 1), nil
}
return nil, err
case task := <-tableChan:
handleSubTask(task)
}
}
}

func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
conf := d.conf
db, tbl := meta.DatabaseName(), meta.TableName()
tableIR, err := SelectAllFromTable(conf, conn, db, tbl)
tableIR, err := SelectAllFromTable(conf, conn, meta)
if err != nil {
return err
}
Expand All @@ -343,52 +404,73 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
if ctxDone {
return tctx.Err()
}

return nil
}

func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
conf := d.conf
if conf.ServerInfo.ServerType == ServerTypeTiDB {
task, err := d.buildConcatTask(tctx, conn, meta)
if err != nil {
return errors.Trace(err)
}
if task != nil {
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
return nil
}
tctx.L().Info("didn't build tidb concat sqls, will select all from table now",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()))
}
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan)
}

// concurrentDumpTable tries to split table into several chunks to dump
func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
conf := d.conf
db, tbl := meta.DatabaseName(), meta.TableName()
if conf.ServerInfo.ServerType == ServerTypeTiDB &&
conf.ServerInfo.ServerVersion != nil &&
conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 {
d.L().Debug("dumping TiDB tables with TABLESAMPLE",
tctx.L().Debug("dumping TiDB tables with TABLESAMPLE",
zap.String("database", db), zap.String("table", tbl))
return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan)
}
field, err := pickupPossibleField(db, tbl, conn, conf)
if err != nil {
return nil
return err
}
if field == "" {
// skip split chunk logic if not found proper field
d.L().Warn("fallback to sequential dump due to no proper field",
tctx.L().Warn("fallback to sequential dump due to no proper field",
zap.String("database", db), zap.String("table", tbl))
return d.sequentialDumpTable(tctx, conn, meta, taskChan)
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan)
}

min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field)
if err != nil {
return err
}
d.L().Debug("get int bounding values",
tctx.L().Debug("get int bounding values",
zap.String("lower", min.String()),
zap.String("upper", max.String()))

count := estimateCount(d.tctx, db, tbl, conn, field, conf)
d.L().Info("get estimated rows count",
tctx.L().Info("get estimated rows count",
zap.String("database", db),
zap.String("table", tbl),
zap.Uint64("estimateCount", count))
if count < conf.Rows {
// skip chunk logic if estimates are low
d.L().Warn("skip concurrent dump due to estimate count < rows",
tctx.L().Warn("skip concurrent dump due to estimate count < rows",
zap.Uint64("estimate count", count),
zap.Uint64("conf.rows", conf.Rows),
zap.String("database", db),
zap.String("table", tbl))
return d.sequentialDumpTable(tctx, conn, meta, taskChan)
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan)
}

// every chunk would have eventual adjustments
Expand Down Expand Up @@ -491,7 +573,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn
return err
}
if len(handleVals) == 0 {
return nil
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan)
}
selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert)
if err != nil {
Expand Down
149 changes: 148 additions & 1 deletion v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,117 @@ func (iter *rowIter) HasNext() bool {
return iter.hasNext
}

// multiQueriesChunkIter implements the SQLRowIter interface.
// Note: To create a rowIter, please use `newRowIter()` instead of struct literal.
type multiQueriesChunkIter struct {
tctx *tcontext.Context
conn *sql.Conn
rows *sql.Rows
hasNext bool
id int
queries []string
args []interface{}
err error
}

func newMultiQueryChunkIter(tctx *tcontext.Context, conn *sql.Conn, queries []string, argLen int) *multiQueriesChunkIter {
r := &multiQueriesChunkIter{
tctx: tctx,
conn: conn,
queries: queries,
id: 0,
args: make([]interface{}, argLen),
}
r.nextRows()
return r
}

func (iter *multiQueriesChunkIter) nextRows() {
if iter.id >= len(iter.queries) {
iter.hasNext = false
return
}
var err error
defer func() {
if err != nil {
iter.hasNext = false
iter.err = errors.Trace(err)
}
}()
tctx, conn := iter.tctx, iter.conn
// avoid the empty chunk
for iter.id < len(iter.queries) {
rows := iter.rows
if rows != nil {
err = rows.Close()
if err != nil {
return
}
err = rows.Err()
if err != nil {
return
}
}
tctx.L().Debug("try to start nextRows", zap.String("query", iter.queries[iter.id]))
rows, err = conn.QueryContext(tctx, iter.queries[iter.id])
if err != nil {
return
}
if err = rows.Err(); err != nil {
return
}
iter.id++
iter.rows = rows
iter.hasNext = iter.rows.Next()
if iter.hasNext {
return
}
}
}

func (iter *multiQueriesChunkIter) Close() error {
if iter.err != nil {
return iter.err
}
if iter.rows != nil {
return iter.rows.Close()
}
return nil
}

func (iter *multiQueriesChunkIter) Decode(row RowReceiver) error {
if iter.err != nil {
return iter.err
}
if iter.rows == nil {
return errors.Errorf("no valid rows found, id: %d", iter.id)
}
return decodeFromRows(iter.rows, iter.args, row)
}

func (iter *multiQueriesChunkIter) Error() error {
if iter.err != nil {
return iter.err
}
if iter.rows != nil {
return errors.Trace(iter.rows.Err())
}
return nil
}

func (iter *multiQueriesChunkIter) Next() {
if iter.err == nil {
iter.hasNext = iter.rows.Next()
if !iter.hasNext {
iter.nextRows()
}
}
}

func (iter *multiQueriesChunkIter) HasNext() bool {
return iter.hasNext
}

type stringIter struct {
idx int
ss []string
Expand Down Expand Up @@ -131,7 +242,7 @@ func (td *tableData) Rows() SQLRowIter {
}

func (td *tableData) Close() error {
return td.rows.Close()
return td.SQLRowIter.Close()
}

func (td *tableData) RawRows() *sql.Rows {
Expand Down Expand Up @@ -215,3 +326,39 @@ func (m *metaData) MetaSQL() string {
}
return m.metaSQL
}

type multiQueriesChunk struct {
tctx *tcontext.Context
conn *sql.Conn
queries []string
colLen int
SQLRowIter
}

func newMultiQueriesChunk(queries []string, colLength int) *multiQueriesChunk {
return &multiQueriesChunk{
queries: queries,
colLen: colLength,
}
}

func (td *multiQueriesChunk) Start(tctx *tcontext.Context, conn *sql.Conn) error {
td.tctx = tctx
td.conn = conn
return nil
}

func (td *multiQueriesChunk) Rows() SQLRowIter {
if td.SQLRowIter == nil {
td.SQLRowIter = newMultiQueryChunkIter(td.tctx, td.conn, td.queries, td.colLen)
}
return td.SQLRowIter
}

func (td *multiQueriesChunk) Close() error {
return td.SQLRowIter.Close()
}

func (td *multiQueriesChunk) RawRows() *sql.Rows {
return nil
}
34 changes: 33 additions & 1 deletion v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func SelectVersion(db *sql.DB) (string, error) {
}

// SelectAllFromTable dumps data serialized from a specified table
func SelectAllFromTable(conf *Config, db *sql.Conn, database, table string) (TableDataIR, error) {
func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR, error) {
database, table := meta.DatabaseName(), meta.TableName()
selectedField, selectLen, err := buildSelectField(db, database, table, conf.CompleteInsert)
if err != nil {
return nil, err
Expand Down Expand Up @@ -267,6 +268,37 @@ func SelectTiDBRowID(db *sql.Conn, database, table string) (bool, error) {
return true, nil
}

// GetSuitableRows gets suitable rows for each table
func GetSuitableRows(tctx *tcontext.Context, db *sql.Conn, database, table string) uint64 {
const (
defaultRows = 200000
maxRows = 1000000
bytesPerFile = 128 * 1024 * 1024 // 128MB per file by default
)
avgRowLength, err := GetAVGRowLength(tctx, db, database, table)
if err != nil || avgRowLength == 0 {
tctx.L().Debug("fail to get average row length", zap.Uint64("averageRowLength", avgRowLength), zap.Error(err))
return defaultRows
}
estimateRows := bytesPerFile / avgRowLength
if estimateRows > maxRows {
return maxRows
}
return estimateRows
}

// GetAVGRowLength gets whether this table's average row length
func GetAVGRowLength(tctx *tcontext.Context, db *sql.Conn, database, table string) (uint64, error) {
const query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=? and table_name=?;"
var avgRowLength uint64
row := db.QueryRowContext(tctx, query, database, table)
err := row.Scan(&avgRowLength)
if err != nil {
return 0, errors.Annotatef(err, "sql: %s", query)
}
return avgRowLength, nil
}

// GetColumnTypes gets *sql.ColumnTypes from a specified table
func GetColumnTypes(db *sql.Conn, fields, database, table string) ([]*sql.ColumnType, error) {
query := fmt.Sprintf("SELECT %s FROM `%s`.`%s` LIMIT 1", fields, escapeString(database), escapeString(table))
Expand Down
Loading

0 comments on commit 463e4cc

Please sign in to comment.