Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prototype] Fast primary key imports #7889

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/dolt/commands/tblcmds/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ func moveRows(
return err
}
} else {
// TODO this could just create val.Tuple pairs
sqlRow, err = NameAndTypeTransform(sqlRow, wr.RowOperationSchema(), rdSqlSch, options.nameMapper)
if err != nil {
return err
Expand Down
256 changes: 190 additions & 66 deletions go/libraries/doltcore/mvdata/engine_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ package mvdata
import (
"context"
"fmt"
"io"
"sync/atomic"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/val"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/analyzer"
"github.com/dolthub/go-mysql-server/sql/analyzer/analyzererrors"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/planbuilder"
"github.com/dolthub/go-mysql-server/sql/rowexec"
"github.com/dolthub/go-mysql-server/sql/transform"
types2 "github.com/dolthub/go-mysql-server/sql/types"
ast "github.com/dolthub/vitess/go/vt/sqlparser"

"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
Expand Down Expand Up @@ -132,11 +139,6 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
return err
}

_, _, err = s.se.Query(s.sqlCtx, "START TRANSACTION")
if err != nil {
return err
}

if s.disableFks {
_, _, err = s.se.Query(s.sqlCtx, "SET FOREIGN_KEY_CHECKS = 0")
if err != nil {
Expand All @@ -149,83 +151,205 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
return err
}

updateStats := func(row sql.Row) {
if row == nil {
return
}
// TODO: need to count updates in apply mutations
// could return ins,upd,del from apply mutations?
// or add a callback where we calculate it?

// If the length of the row does not match the schema then we have an update operation.
if len(row) != len(s.tableSchema.Schema) {
oldRow := row[:len(row)/2]
newRow := row[len(row)/2:]

if ok, err := oldRow.Equals(newRow, s.tableSchema.Schema); err == nil {
if ok {
s.stats.SameVal++
} else {
s.stats.Modifications++
}
}
} else {
s.stats.Additions++
}
// TODO: MutateMapWithTupleIter
// use MutateMapWithTupleIter to directly write sorted rows from the CSV to the prolly map
// (only primary index, requires sorted input)

dSess := dsess.DSessFromSess(s.sqlCtx.Session)

tx, err := dSess.StartTransaction(s.sqlCtx, sql.ReadWrite)
if err != nil {
return err
}

sqlDb, err := dSess.Provider().Database(s.sqlCtx, s.database)
sqlTable, _, err := sqlDb.GetTableInsensitive(s.sqlCtx, s.tableName)

var dTab *doltdb.Table
switch t := sqlTable.(type) {
case *sqle.AlterableDoltTable:
dTab, err = t.DoltTable.DoltTable(s.sqlCtx)
case *sqle.WritableDoltTable:
dTab, err = t.DoltTable.DoltTable(s.sqlCtx)
case *sqle.DoltTable:
dTab, err = t.DoltTable(s.sqlCtx)
default:
err = fmt.Errorf("failed to unwrap dolt table from type: %T", sqlTable)
}

priIdx, err := dTab.GetRowData(s.sqlCtx)
priMap := durable.ProllyMapFromIndex(priIdx)

// row strings, need schema sql.Types to convert to normal types, then tuple desc put field
kd, vd := priMap.Descriptors()

dSchema, err := dTab.GetSchema(ctx)
keyMap, valMap := writer.OrdinalMappingsFromSchema(s.rowOperationSchema.Schema, dSchema)

iter := &tupleIter{
ctx: s.sqlCtx,
inp: inputChannel,
stats: s.stats,
statsCB: s.statsCB,
badRowCb: badRowCb,
tableName: s.tableName,
sch: s.tableSchema,
ns: priMap.NodeStore(),
valBld: val.NewTupleBuilder(vd),
keyBld: val.NewTupleBuilder(kd),
keyMap: keyMap,
valMap: valMap,
b: planbuilder.New(s.sqlCtx, nil, nil),
}
// tuple iter: converts sql.Row->(Tup,Tup), does counter
newMap, err := prolly.MutateMapWithTupleIter(s.sqlCtx, priMap, iter)
if err != nil {
return err
}

if iter.err != nil {
return iter.err
}

// final stats check
s.statsCB(s.stats)

// save map
newTab, err := dTab.UpdateRows(ctx, durable.IndexFromProllyMap(newMap))
if err != nil {
return err
}

insertOrUpdateOperation, err := s.getInsertNode(inputChannel, false)
ws, err := dSess.WorkingSet(s.sqlCtx, s.database)
if err != nil {
return err
}

iter, err := rowexec.DefaultBuilder.Build(s.sqlCtx, insertOrUpdateOperation, nil)
wr, err := ws.WorkingRoot().PutTable(ctx, doltdb.TableName{Name: s.tableName}, newTab)
if err != nil {
return err
}

defer func() {
rerr := iter.Close(s.sqlCtx)
if err == nil {
err = rerr
ws = ws.WithWorkingRoot(wr)
if err := dSess.SetWorkingSet(s.sqlCtx, s.database, ws); err != nil {
return err
}

return dSess.CommitTransaction(s.sqlCtx, tx)
}

type tupleIter struct {
ctx context.Context
err error

inp chan sql.Row
statsCB noms.StatsCB
stats types.AppliedEditStats
badRowCb func(row sql.Row, rowSchema sql.PrimaryKeySchema, tableName string, lineNumber int, err error) bool
sch sql.PrimaryKeySchema
tableName string

b *planbuilder.Builder
line int

ns tree.NodeStore
keyBld *val.TupleBuilder
valBld *val.TupleBuilder
keyMap val.OrdinalMapping
valMap val.OrdinalMapping
}

var _ prolly.TupleIter = (*tupleIter)(nil)

func (t *tupleIter) Next(ctx context.Context) (val.Tuple, val.Tuple) {
select {
case <-ctx.Done():
case r, ok := <-t.inp:
if !ok {
return nil, nil
}
t.statsCheck()
t.line++
sqlRow := t.convert(r)
if t.err != nil {
break
}
}()
k, v := t.tuples(sqlRow)
if t.err != nil {
break
}
// todo bad row callback
// todo error handling
return k, v
}

line := 1
if t.err != nil {
var offendingRow sql.Row
switch n := t.err.(type) {
case sql.WrappedInsertError:
offendingRow = n.OffendingRow
case sql.IgnorableError:
offendingRow = n.OffendingRow
}

for {
if s.statsCB != nil && atomic.LoadInt32(&s.statOps) >= tableWriterStatUpdateRate {
atomic.StoreInt32(&s.statOps, 0)
s.statsCB(s.stats)
quit := t.badRowCb(offendingRow, t.sch, t.tableName, t.line, t.err)
if quit {
return nil, nil
}
t.err = nil
}
return nil, nil
}

func (t *tupleIter) convert(r sql.Row) sql.Row {
ret := make(sql.Row, len(r))
for i, v := range r {
sqlTyp := t.sch.Schema[i].Type
var valTyp ast.ValType
if types2.IsNumber(sqlTyp) {
valTyp = ast.IntVal
} else if types2.IsText(sqlTyp) {
valTyp = ast.StrVal
}
val := &ast.SQLVal{Type: valTyp, Val: []byte(v.(string))}
e := t.b.ConvertVal(val)
ret[i], _, t.err = sqlTyp.Convert(e.(*expression.Literal).Value())
if t.err != nil {
return nil
}
}
return ret
}

row, err := iter.Next(s.sqlCtx)
line += 1
func (t *tupleIter) statsCheck() {
if t.statsCB != nil && t.line%tableWriterStatUpdateRate == 0 {
t.statsCB(t.stats)
}
}

// All other errors are handled by the errorHandler
if err == nil {
_ = atomic.AddInt32(&s.statOps, 1)
updateStats(row)
} else if err == io.EOF {
atomic.LoadInt32(&s.statOps)
atomic.StoreInt32(&s.statOps, 0)
if s.statsCB != nil {
s.statsCB(s.stats)
}
func (t *tupleIter) tuples(sqlRow sql.Row) (val.Tuple, val.Tuple) {
for to := range t.keyMap {
from := t.keyMap.MapOrdinal(to)
if err := tree.PutField(t.ctx, t.ns, t.keyBld, to, sqlRow[from]); err != nil {
t.err = err
return nil, nil
}
}
k := t.keyBld.BuildPermissive(t.ns.Pool())

return err
} else {
var offendingRow sql.Row
switch n := err.(type) {
case sql.WrappedInsertError:
offendingRow = n.OffendingRow
case sql.IgnorableError:
offendingRow = n.OffendingRow
}

quit := badRowCb(offendingRow, s.tableSchema, s.tableName, line, err)
if quit {
return err
}
for to := range t.valMap {
from := t.valMap.MapOrdinal(to)
if err := tree.PutField(t.ctx, t.ns, t.valBld, to, sqlRow[from]); err != nil {
t.err = err
return nil, nil
}
}
v := t.valBld.Build(t.ns.Pool())
return k, v
}

func (s *SqlEngineTableWriter) Commit(ctx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/sqle/writer/prolly_index_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func getPrimaryProllyWriter(ctx context.Context, t *doltdb.Table, sqlSch sql.Sch
m := durable.ProllyMapFromIndex(idx)

keyDesc, valDesc := m.Descriptors()
keyMap, valMap := ordinalMappingsFromSchema(sqlSch, sch)
keyMap, valMap := OrdinalMappingsFromSchema(sqlSch, sch)

return prollyIndexWriter{
mut: m.Mutate(),
Expand All @@ -58,7 +58,7 @@ func getPrimaryKeylessProllyWriter(ctx context.Context, t *doltdb.Table, sqlSch
m := durable.ProllyMapFromIndex(idx)

keyDesc, valDesc := m.Descriptors()
_, valMap := ordinalMappingsFromSchema(sqlSch, sch)
_, valMap := OrdinalMappingsFromSchema(sqlSch, sch)

return prollyKeylessWriter{
mut: m.Mutate(),
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/sqle/writer/prolly_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func getSecondaryProllyIndexWriters(ctx context.Context, t *doltdb.Table, sqlSch
}
idxMap := durable.ProllyMapFromIndex(idxRows)

keyMap, _ := ordinalMappingsFromSchema(sqlSch, def.Schema())
keyMap, _ := OrdinalMappingsFromSchema(sqlSch, def.Schema())
keyDesc, _ := idxMap.Descriptors()

// mapping from secondary index key to primary key
Expand Down Expand Up @@ -121,7 +121,7 @@ func getSecondaryKeylessProllyWriters(ctx context.Context, t *doltdb.Table, sqlS
m := durable.ProllyMapFromIndex(idxRows)
m = prolly.ConvertToSecondaryKeylessIndex(m)

keyMap, _ := ordinalMappingsFromSchema(sqlSch, def.Schema())
keyMap, _ := OrdinalMappingsFromSchema(sqlSch, def.Schema())
keyDesc, _ := m.Descriptors()

writers[defName] = prollyKeylessSecondaryWriter{
Expand Down Expand Up @@ -381,7 +381,7 @@ func (w *prollyTableWriter) flush(ctx *sql.Context) error {
return w.setter(ctx, w.dbName, ws.WorkingRoot())
}

func ordinalMappingsFromSchema(from sql.Schema, to schema.Schema) (km, vm val.OrdinalMapping) {
func OrdinalMappingsFromSchema(from sql.Schema, to schema.Schema) (km, vm val.OrdinalMapping) {
km = makeOrdinalMapping(from, to.GetPKCols())
vm = makeOrdinalMapping(from, to.GetNonPKCols())
return
Expand Down
10 changes: 5 additions & 5 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ func (ts tableSet) Size() int {
// append adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) {
addrs := hash.NewHashSet()
for _, getAddrs := range mt.getChildAddrs {
getAddrs(ctx, addrs, func(h hash.Hash) bool { return hasCache.Contains(h) })
}
mt.addChildRefs(addrs)
//addrs := hash.NewHashSet()
//for _, getAddrs := range mt.getChildAddrs {
// getAddrs(ctx, addrs, func(h hash.Hash) bool { return hasCache.Contains(h) })
//}
//mt.addChildRefs(addrs)

for i := range mt.pendingRefs {
if !mt.pendingRefs[i].has && hasCache.Contains(*mt.pendingRefs[i].a) {
Expand Down