forked from Shopify/ghostferry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
binlog_writer.go
114 lines (92 loc) · 2.55 KB
/
binlog_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package ghostferry
import (
"database/sql"
"fmt"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
)
type BinlogWriter struct {
DB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
Throttler Throttler
BatchSize int
WriteRetries int
ErrorHandler ErrorHandler
binlogEventBuffer chan DMLEvent
logger *logrus.Entry
}
func (b *BinlogWriter) Initialize() error {
b.logger = logrus.WithField("tag", "binlog_writer")
b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize)
return nil
}
func (b *BinlogWriter) Run() {
batch := make([]DMLEvent, 0, b.BatchSize)
for {
firstEvent := <-b.binlogEventBuffer
if firstEvent == nil {
// Channel is closed, no more events to write
break
}
batch = append(batch, firstEvent)
wantMoreEvents := true
for wantMoreEvents && len(batch) < b.BatchSize {
select {
case event := <-b.binlogEventBuffer:
if event != nil {
batch = append(batch, event)
} else {
// Channel is closed, finish writing batch.
wantMoreEvents = false
}
default: // Nothing in the buffer so just write it
wantMoreEvents = false
}
}
err := WithRetries(b.WriteRetries, 0, b.logger, "write events to target", func() error {
return b.writeEvents(batch)
})
if err != nil {
b.ErrorHandler.Fatal("binlog_writer", err)
return
}
batch = make([]DMLEvent, 0, b.BatchSize)
}
}
func (b *BinlogWriter) Stop() {
close(b.binlogEventBuffer)
}
func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error {
for _, event := range events {
b.binlogEventBuffer <- event
}
return nil
}
func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)
queryBuffer := []byte("BEGIN;\n")
for _, ev := range events {
eventDatabaseName := ev.Database()
if targetDatabaseName, exists := b.DatabaseRewrites[eventDatabaseName]; exists {
eventDatabaseName = targetDatabaseName
}
eventTableName := ev.Table()
if targetTableName, exists := b.TableRewrites[eventTableName]; exists {
eventTableName = targetTableName
}
sql, err := ev.AsSQLString(&schema.Table{Schema: eventDatabaseName, Name: eventTableName})
if err != nil {
return fmt.Errorf("generating sql query: %v", err)
}
queryBuffer = append(queryBuffer, sql...)
queryBuffer = append(queryBuffer, ";\n"...)
}
queryBuffer = append(queryBuffer, "COMMIT"...)
query := string(queryBuffer)
_, err := b.DB.Exec(query)
if err != nil {
return fmt.Errorf("exec query (%d bytes): %v", len(query), err)
}
return nil
}