From 57c20c2733dff45ebb5ab529ac9771b2e24f1d5e Mon Sep 17 00:00:00 2001 From: Nikhil Garg Date: Wed, 29 Jul 2015 07:37:46 -0700 Subject: [PATCH] bitset: add binlog generation for replication --- bitset.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++-- bitset_test.go | 13 ++++--- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/bitset.go b/bitset.go index 9f0b2de..fdd99ac 100644 --- a/bitset.go +++ b/bitset.go @@ -1,20 +1,109 @@ package bitset +import ( + "bufio" + "encoding/binary" + "fmt" + "os" + "time" +) + const word = uint64(64) const logword = uint(6) +const bufsize = 1 << 16 + +// each channel write is 9 bytes, so if we were to write full disk sector +// of 64K bytes at once, we'd flush ~7112 writes +const channelsize = 7200 + +const ( + new = iota + set + clear + flip +) + +type command struct { + code uint8 + index uint64 +} type BitSet struct { - length uint64 - bits []uint64 + length uint64 + bits []uint64 + writes chan command + binlogfile string } func getSize(length uint64) uint64 { return uint64((length + word - 1) / word) } -func New(length uint64) *BitSet { +func flush(filename string, writes chan command) { + f, err := os.Create(filename) + if err != nil { + panic(err) + } + + defer func() { + if err := f.Close(); err != nil { + panic(err) + } + }() + + // make a buffered writer + w := bufio.NewWriterSize(f, bufsize) + + ticker := time.NewTimer(time.Second) + + buf := make([]byte, bufsize) + long := make([]byte, 8) + + put := func(w *command) { + binary.BigEndian.PutUint64(long, w.index) + buf = append(buf, w.code) + buf = append(buf, long...) + } + + for t := range ticker.C { + _ = t + b := 0 + + drain: + for b+9 < bufsize { + select { + case write := <-writes: + put(&write) + b += 9 + default: + break drain + } + } + + if b > 0 { + // write a chunk + if _, err := w.Write(buf[:b]); err != nil { + panic(err) + } + + if err = w.Flush(); err != nil { + panic(err) + } + } + } +} + +func New(length uint64, binlogfile string) *BitSet { size := getSize(length) - return &BitSet{length, make([]uint64, size)} + ret := &BitSet{ + length, + make([]uint64, size), + make(chan command, channelsize), + binlogfile, + } + ret.writes <- command{new, length} + go flush(binlogfile, ret.writes) + return ret } func getIndex(pos uint64) (q uint64, r uint) { @@ -37,6 +126,7 @@ func (b *BitSet) Set(pos uint64) bool { current := b.Get(pos) q, r := getIndex(pos) b.bits[q] |= (1 << r) + b.writes <- command{set, pos} return current } @@ -44,6 +134,7 @@ func (b *BitSet) Clear(pos uint64) bool { current := b.Get(pos) q, r := getIndex(pos) b.bits[q] &= ^(1 << r) + b.writes <- command{clear, pos} return current } @@ -51,5 +142,6 @@ func (b *BitSet) Flip(pos uint64) bool { current := b.Get(pos) q, r := getIndex(pos) b.bits[q] ^= (1 << r) + b.writes <- command{flip, pos} return current } diff --git a/bitset_test.go b/bitset_test.go index 2a3c75b..fc64d64 100644 --- a/bitset_test.go +++ b/bitset_test.go @@ -2,10 +2,11 @@ package bitset import ( "testing" + "time" ) func TestInit(t *testing.T) { - bs := New(1000) + bs := New(1000, "/tmp/bitset.log") // initially all bits should be zero for i := uint64(0); i < 1000; i++ { if bs.Get(i) { @@ -15,7 +16,7 @@ func TestInit(t *testing.T) { } func TestGetSet(t *testing.T) { - bs := New(1000) + bs := New(1000, "/tmp/bitset.log") if bs.Set(0) == true { t.Error("Set on bit %d returned true when it should return false", 0) } @@ -31,7 +32,7 @@ func TestGetSet(t *testing.T) { func TestLargeSetGet(t *testing.T) { size := uint64(1) << 35 - bs := New(size) + bs := New(size, "/tmp/bitset.log") positions := []uint64{0, 1, 10, 1000, 1 << 32, size - 1} for _, position := range positions { @@ -51,7 +52,7 @@ func TestLength(t *testing.T) { sizes := []uint64{0, 1, 10, 1000, 1 << 32, 1 << 33} for _, size := range sizes { - bs := New(size) + bs := New(size, "/tmp/bitset.log") if bs.Length() != size { t.Errorf("Length should be %d", size) } @@ -59,7 +60,7 @@ func TestLength(t *testing.T) { } func TestClear(t *testing.T) { - bs := New(1000) + bs := New(1000, "/tmp/bitset.log") pos := uint64(1) bs.Set(pos) @@ -77,7 +78,7 @@ func TestClear(t *testing.T) { } func TestFlip(t *testing.T) { - bs := New(1000) + bs := New(1000, "/tmp/bitset.log") pos := uint64(1) if bs.Get(pos) {