Skip to content

Commit c8ed781

Browse files
Deardropslonngforeyes
authored
executor: push-down LoadDataStmt in distsql (pingcap#11067)
moving function `statementContextToFlags` to `StatementContext.PushDownFlags()` Co-authored-by: Lonng <[email protected]> Co-authored-by: Foreyes <[email protected]>
1 parent bdec341 commit c8ed781

File tree

6 files changed

+91
-57
lines changed

6 files changed

+91
-57
lines changed

executor/admin.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
135135
dagReq.StartTs = txn.StartTS()
136136
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
137137
sc := e.ctx.GetSessionVars().StmtCtx
138-
dagReq.Flags = statementContextToFlags(sc)
138+
dagReq.Flags = sc.PushDownFlags()
139139
for i := range e.schema.Columns {
140140
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
141141
}
@@ -233,7 +233,7 @@ func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tip
233233
dagReq.StartTs = txn.StartTS()
234234
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
235235
sc := e.ctx.GetSessionVars().StmtCtx
236-
dagReq.Flags = statementContextToFlags(sc)
236+
dagReq.Flags = sc.PushDownFlags()
237237
for i := range e.columns {
238238
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
239239
}
@@ -670,7 +670,7 @@ func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest,
670670
dagReq.StartTs = txn.StartTS()
671671
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
672672
sc := e.ctx.GetSessionVars().StmtCtx
673-
dagReq.Flags = statementContextToFlags(sc)
673+
dagReq.Flags = sc.PushDownFlags()
674674
for i := range e.idxCols {
675675
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
676676
}

executor/builder.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
14001400

14011401
func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, maxNumBuckets uint64, autoAnalyze string) *analyzeTask {
14021402
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
1403+
sc := b.ctx.GetSessionVars().StmtCtx
14031404
e := &AnalyzeIndexExec{
14041405
ctx: b.ctx,
14051406
physicalTableID: task.PhysicalTableID,
@@ -1408,7 +1409,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde
14081409
analyzePB: &tipb.AnalyzeReq{
14091410
Tp: tipb.AnalyzeType_TypeIndex,
14101411
StartTs: math.MaxUint64,
1411-
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
1412+
Flags: sc.PushDownFlags(),
14121413
TimeZoneOffset: offset,
14131414
},
14141415
maxNumBuckets: maxNumBuckets,
@@ -1466,6 +1467,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
14661467
}
14671468

14681469
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
1470+
sc := b.ctx.GetSessionVars().StmtCtx
14691471
e := &AnalyzeColumnsExec{
14701472
ctx: b.ctx,
14711473
physicalTableID: task.PhysicalTableID,
@@ -1475,7 +1477,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
14751477
analyzePB: &tipb.AnalyzeReq{
14761478
Tp: tipb.AnalyzeType_TypeColumn,
14771479
StartTs: math.MaxUint64,
1478-
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
1480+
Flags: sc.PushDownFlags(),
14791481
TimeZoneOffset: offset,
14801482
},
14811483
maxNumBuckets: maxNumBuckets,
@@ -1658,7 +1660,7 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag
16581660
}
16591661
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location())
16601662
sc := b.ctx.GetSessionVars().StmtCtx
1661-
dagReq.Flags = statementContextToFlags(sc)
1663+
dagReq.Flags = sc.PushDownFlags()
16621664
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
16631665
return dagReq, streaming, err
16641666
}

executor/distsql.go

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/pingcap/tidb/kv"
3434
plannercore "github.com/pingcap/tidb/planner/core"
3535
"github.com/pingcap/tidb/sessionctx"
36-
"github.com/pingcap/tidb/sessionctx/stmtctx"
3736
"github.com/pingcap/tidb/statistics"
3837
"github.com/pingcap/tidb/table"
3938
"github.com/pingcap/tidb/types"
@@ -123,36 +122,6 @@ func closeAll(objs ...Closeable) error {
123122
return nil
124123
}
125124

126-
// statementContextToFlags converts StatementContext to tipb.SelectRequest.Flags.
127-
func statementContextToFlags(sc *stmtctx.StatementContext) uint64 {
128-
var flags uint64
129-
if sc.InInsertStmt {
130-
flags |= model.FlagInInsertStmt
131-
} else if sc.InUpdateStmt || sc.InDeleteStmt {
132-
flags |= model.FlagInUpdateOrDeleteStmt
133-
} else if sc.InSelectStmt {
134-
flags |= model.FlagInSelectStmt
135-
}
136-
if sc.IgnoreTruncate {
137-
flags |= model.FlagIgnoreTruncate
138-
} else if sc.TruncateAsWarning {
139-
flags |= model.FlagTruncateAsWarning
140-
}
141-
if sc.OverflowAsWarning {
142-
flags |= model.FlagOverflowAsWarning
143-
}
144-
if sc.IgnoreZeroInDate {
145-
flags |= model.FlagIgnoreZeroInDate
146-
}
147-
if sc.DividedByZeroAsWarning {
148-
flags |= model.FlagDividedByZeroAsWarning
149-
}
150-
if sc.PadCharToFullLength {
151-
flags |= model.FlagPadCharToFullLength
152-
}
153-
return flags
154-
}
155-
156125
// handleIsExtra checks whether this column is a extra handle column generated during plan building phase.
157126
func handleIsExtra(col *expression.Column) bool {
158127
if col != nil && col.ID == model.ExtraHandleID {

go.sum

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo
1212
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
1313
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
1414
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
15-
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
1615
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
1716
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
1817
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
@@ -192,7 +191,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd
192191
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
193192
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY2EPqg2NbXKuMHs5pXJB9hjj1fDHnF2vl28=
194193
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
195-
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
196194
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
197195
github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs=
198196
github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
@@ -297,7 +295,6 @@ google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3
297295
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
298296
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
299297
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
300-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
301298
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
302299
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
303300
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

sessionctx/stmtctx/stmtctx.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/pingcap/parser"
24+
"github.com/pingcap/parser/model"
2425
"github.com/pingcap/parser/mysql"
2526
"github.com/pingcap/tidb/util/execdetails"
2627
"github.com/pingcap/tidb/util/memory"
@@ -438,6 +439,39 @@ func (sc *StatementContext) ShouldIgnoreOverflowError() bool {
438439
return false
439440
}
440441

442+
// PushDownFlags converts StatementContext to tipb.SelectRequest.Flags.
443+
func (sc *StatementContext) PushDownFlags() uint64 {
444+
var flags uint64
445+
if sc.InInsertStmt {
446+
flags |= model.FlagInInsertStmt
447+
} else if sc.InUpdateStmt || sc.InDeleteStmt {
448+
flags |= model.FlagInUpdateOrDeleteStmt
449+
} else if sc.InSelectStmt {
450+
flags |= model.FlagInSelectStmt
451+
}
452+
if sc.IgnoreTruncate {
453+
flags |= model.FlagIgnoreTruncate
454+
} else if sc.TruncateAsWarning {
455+
flags |= model.FlagTruncateAsWarning
456+
}
457+
if sc.OverflowAsWarning {
458+
flags |= model.FlagOverflowAsWarning
459+
}
460+
if sc.IgnoreZeroInDate {
461+
flags |= model.FlagIgnoreZeroInDate
462+
}
463+
if sc.DividedByZeroAsWarning {
464+
flags |= model.FlagDividedByZeroAsWarning
465+
}
466+
if sc.PadCharToFullLength {
467+
flags |= model.FlagPadCharToFullLength
468+
}
469+
if sc.InLoadDataStmt {
470+
flags |= model.FlagInLoadDataStmt
471+
}
472+
return flags
473+
}
474+
441475
// CopTasksDetails returns some useful information of cop-tasks during execution.
442476
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
443477
sc.mu.Lock()

sessionctx/stmtctx/stmtctx_test.go

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,28 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
package stmtctx
14+
package stmtctx_test
1515

1616
import (
1717
"fmt"
1818
"testing"
1919
"time"
2020

21+
. "github.com/pingcap/check"
22+
"github.com/pingcap/tidb/sessionctx/stmtctx"
2123
"github.com/pingcap/tidb/util/execdetails"
2224
)
2325

24-
func TestCopTasksDetails(t *testing.T) {
25-
ctx := new(StatementContext)
26+
func TestT(t *testing.T) {
27+
TestingT(t)
28+
}
29+
30+
type stmtctxSuit struct{}
31+
32+
var _ = Suite(&stmtctxSuit{})
33+
34+
func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
35+
ctx := new(stmtctx.StatementContext)
2636
for i := 0; i < 100; i++ {
2737
d := &execdetails.ExecDetails{
2838
CalleeAddress: fmt.Sprintf("%v", i+1),
@@ -31,20 +41,42 @@ func TestCopTasksDetails(t *testing.T) {
3141
}
3242
ctx.MergeExecDetails(d, nil)
3343
}
34-
c := ctx.CopTasksDetails()
35-
if c.NumCopTasks != 100 ||
36-
c.AvgProcessTime != time.Second*101/2 ||
37-
c.P90ProcessTime != time.Second*91 ||
38-
c.MaxProcessTime != time.Second*100 ||
39-
c.MaxProcessAddress != "100" ||
40-
c.AvgWaitTime != time.Millisecond*101/2 ||
41-
c.P90WaitTime != time.Millisecond*91 ||
42-
c.MaxWaitTime != time.Millisecond*100 ||
43-
c.MaxWaitAddress != "100" {
44-
t.Fatal(c)
44+
d := ctx.CopTasksDetails()
45+
c.Assert(d.NumCopTasks, Equals, 100)
46+
c.Assert(d.AvgProcessTime, Equals, time.Second*101/2)
47+
c.Assert(d.P90ProcessTime, Equals, time.Second*91)
48+
c.Assert(d.MaxProcessTime, Equals, time.Second*100)
49+
c.Assert(d.MaxProcessAddress, Equals, "100")
50+
c.Assert(d.AvgWaitTime, Equals, time.Millisecond*101/2)
51+
c.Assert(d.P90WaitTime, Equals, time.Millisecond*91)
52+
c.Assert(d.MaxWaitTime, Equals, time.Millisecond*100)
53+
c.Assert(d.MaxWaitAddress, Equals, "100")
54+
fields := d.ToZapFields()
55+
c.Assert(len(fields), Equals, 9)
56+
}
57+
58+
func (s *stmtctxSuit) TestStatementContextPushDownFLags(c *C) {
59+
testCases := []struct {
60+
in *stmtctx.StatementContext
61+
out uint64
62+
}{
63+
{&stmtctx.StatementContext{InInsertStmt: true}, 8},
64+
{&stmtctx.StatementContext{InUpdateStmt: true}, 16},
65+
{&stmtctx.StatementContext{InDeleteStmt: true}, 16},
66+
{&stmtctx.StatementContext{InSelectStmt: true}, 32},
67+
{&stmtctx.StatementContext{IgnoreTruncate: true}, 1},
68+
{&stmtctx.StatementContext{TruncateAsWarning: true}, 2},
69+
{&stmtctx.StatementContext{OverflowAsWarning: true}, 64},
70+
{&stmtctx.StatementContext{IgnoreZeroInDate: true}, 128},
71+
{&stmtctx.StatementContext{DividedByZeroAsWarning: true}, 256},
72+
{&stmtctx.StatementContext{PadCharToFullLength: true}, 4},
73+
{&stmtctx.StatementContext{InLoadDataStmt: true}, 1024},
74+
{&stmtctx.StatementContext{InSelectStmt: true, TruncateAsWarning: true}, 34},
75+
{&stmtctx.StatementContext{DividedByZeroAsWarning: true, IgnoreTruncate: true}, 257},
76+
{&stmtctx.StatementContext{InUpdateStmt: true, IgnoreZeroInDate: true, InLoadDataStmt: true}, 1168},
4577
}
46-
fields := c.ToZapFields()
47-
if len(fields) != 9 {
48-
t.Fatal(c)
78+
for _, tt := range testCases {
79+
got := tt.in.PushDownFlags()
80+
c.Assert(got, Equals, tt.out, Commentf("get %v, want %v", got, tt.out))
4981
}
5082
}

0 commit comments

Comments
 (0)