Skip to content

Commit 57c20c2

Browse files
committed
bitset: add binlog generation for replication
1 parent b57da1e commit 57c20c2

File tree

2 files changed

+103
-10
lines changed

2 files changed

+103
-10
lines changed

bitset.go

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,109 @@
11
package bitset
22

3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"fmt"
7+
"os"
8+
"time"
9+
)
10+
311
const word = uint64(64)
412
const logword = uint(6)
13+
const bufsize = 1 << 16
14+
15+
// each channel write is 9 bytes, so if we were to write full disk sector
16+
// of 64K bytes at once, we'd flush ~7112 writes
17+
const channelsize = 7200
18+
19+
const (
20+
new = iota
21+
set
22+
clear
23+
flip
24+
)
25+
26+
type command struct {
27+
code uint8
28+
index uint64
29+
}
530

631
type BitSet struct {
7-
length uint64
8-
bits []uint64
32+
length uint64
33+
bits []uint64
34+
writes chan command
35+
binlogfile string
936
}
1037

1138
func getSize(length uint64) uint64 {
1239
return uint64((length + word - 1) / word)
1340
}
1441

15-
func New(length uint64) *BitSet {
42+
func flush(filename string, writes chan command) {
43+
f, err := os.Create(filename)
44+
if err != nil {
45+
panic(err)
46+
}
47+
48+
defer func() {
49+
if err := f.Close(); err != nil {
50+
panic(err)
51+
}
52+
}()
53+
54+
// make a buffered writer
55+
w := bufio.NewWriterSize(f, bufsize)
56+
57+
ticker := time.NewTimer(time.Second)
58+
59+
buf := make([]byte, bufsize)
60+
long := make([]byte, 8)
61+
62+
put := func(w *command) {
63+
binary.BigEndian.PutUint64(long, w.index)
64+
buf = append(buf, w.code)
65+
buf = append(buf, long...)
66+
}
67+
68+
for t := range ticker.C {
69+
_ = t
70+
b := 0
71+
72+
drain:
73+
for b+9 < bufsize {
74+
select {
75+
case write := <-writes:
76+
put(&write)
77+
b += 9
78+
default:
79+
break drain
80+
}
81+
}
82+
83+
if b > 0 {
84+
// write a chunk
85+
if _, err := w.Write(buf[:b]); err != nil {
86+
panic(err)
87+
}
88+
89+
if err = w.Flush(); err != nil {
90+
panic(err)
91+
}
92+
}
93+
}
94+
}
95+
96+
func New(length uint64, binlogfile string) *BitSet {
1697
size := getSize(length)
17-
return &BitSet{length, make([]uint64, size)}
98+
ret := &BitSet{
99+
length,
100+
make([]uint64, size),
101+
make(chan command, channelsize),
102+
binlogfile,
103+
}
104+
ret.writes <- command{new, length}
105+
go flush(binlogfile, ret.writes)
106+
return ret
18107
}
19108

20109
func getIndex(pos uint64) (q uint64, r uint) {
@@ -37,19 +126,22 @@ func (b *BitSet) Set(pos uint64) bool {
37126
current := b.Get(pos)
38127
q, r := getIndex(pos)
39128
b.bits[q] |= (1 << r)
129+
b.writes <- command{set, pos}
40130
return current
41131
}
42132

43133
func (b *BitSet) Clear(pos uint64) bool {
44134
current := b.Get(pos)
45135
q, r := getIndex(pos)
46136
b.bits[q] &= ^(1 << r)
137+
b.writes <- command{clear, pos}
47138
return current
48139
}
49140

50141
func (b *BitSet) Flip(pos uint64) bool {
51142
current := b.Get(pos)
52143
q, r := getIndex(pos)
53144
b.bits[q] ^= (1 << r)
145+
b.writes <- command{flip, pos}
54146
return current
55147
}

bitset_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package bitset
22

33
import (
44
"testing"
5+
"time"
56
)
67

78
func TestInit(t *testing.T) {
8-
bs := New(1000)
9+
bs := New(1000, "/tmp/bitset.log")
910
// initially all bits should be zero
1011
for i := uint64(0); i < 1000; i++ {
1112
if bs.Get(i) {
@@ -15,7 +16,7 @@ func TestInit(t *testing.T) {
1516
}
1617

1718
func TestGetSet(t *testing.T) {
18-
bs := New(1000)
19+
bs := New(1000, "/tmp/bitset.log")
1920
if bs.Set(0) == true {
2021
t.Error("Set on bit %d returned true when it should return false", 0)
2122
}
@@ -31,7 +32,7 @@ func TestGetSet(t *testing.T) {
3132

3233
func TestLargeSetGet(t *testing.T) {
3334
size := uint64(1) << 35
34-
bs := New(size)
35+
bs := New(size, "/tmp/bitset.log")
3536

3637
positions := []uint64{0, 1, 10, 1000, 1 << 32, size - 1}
3738
for _, position := range positions {
@@ -51,15 +52,15 @@ func TestLength(t *testing.T) {
5152

5253
sizes := []uint64{0, 1, 10, 1000, 1 << 32, 1 << 33}
5354
for _, size := range sizes {
54-
bs := New(size)
55+
bs := New(size, "/tmp/bitset.log")
5556
if bs.Length() != size {
5657
t.Errorf("Length should be %d", size)
5758
}
5859
}
5960
}
6061

6162
func TestClear(t *testing.T) {
62-
bs := New(1000)
63+
bs := New(1000, "/tmp/bitset.log")
6364
pos := uint64(1)
6465
bs.Set(pos)
6566

@@ -77,7 +78,7 @@ func TestClear(t *testing.T) {
7778
}
7879

7980
func TestFlip(t *testing.T) {
80-
bs := New(1000)
81+
bs := New(1000, "/tmp/bitset.log")
8182
pos := uint64(1)
8283

8384
if bs.Get(pos) {

0 commit comments

Comments
 (0)