Skip to content

Commit

Permalink
WIP: Basic Working LogCacheAsync with fuzzy test
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Oct 30, 2023
1 parent d09d941 commit 8d65ae0
Show file tree
Hide file tree
Showing 10 changed files with 761 additions and 22 deletions.
1 change: 1 addition & 0 deletions fuzzy/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data
50 changes: 50 additions & 0 deletions fuzzy/async_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package fuzzy

import (
"math/rand"
"testing"
"time"

"github.com/hashicorp/raft"
)

// 5 node cluster where the leader and another node get regularly partitioned off
// eventually all partitions heal, but using async log cache.
func TestRaft_AsyncLogWithPartitions(t *testing.T) {
hooks := NewPartitioner()

cluster := newRaftClusterWithFactory(t, testLogWriter, "lp", 5, hooks, newAsyncRaft)
cluster.Leader(time.Second * 10)
s := newApplySource("LeaderPartitions")
applier := s.apply(t, cluster, 5)
for i := 0; i < 10; i++ {
pg := hooks.PartitionOff(cluster.log, cluster.LeaderPlus(rand.Intn(4)))
time.Sleep(time.Second * 4)
r := rand.Intn(10)
if r < 1 {
cluster.log.Logf("Healing no partitions!")
} else if r < 4 {
hooks.HealAll(cluster.log)
} else {
hooks.Heal(cluster.log, pg)
}
time.Sleep(time.Second * 5)
}
hooks.HealAll(cluster.log)
cluster.Leader(time.Hour)
applier.stop()
cluster.Stop(t, time.Minute*10)
hooks.Report(t)
cluster.VerifyLog(t, applier.applied)
cluster.VerifyFSM(t)
}

func newAsyncRaft(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error) {
// Wrap the log store in an async cache
asyncLogs, err := raft.NewLogCacheAsync(128, logs)
if err != nil {
return nil, err
}

return raft.NewRaft(conf, fsm, asyncLogs, stable, snaps, trans)
}
8 changes: 6 additions & 2 deletions fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (a *LoggerAdapter) Logf(s string, v ...interface{}) {
}

func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks) *cluster {
return newRaftClusterWithFactory(t, logWriter, namePrefix, n, transportHooks, raft.NewRaft)
}

func newRaftClusterWithFactory(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks, factory factoryFn) *cluster {
res := make([]*raftNode, 0, n)
names := make([]string, 0, n)
for i := uint(0); i < n; i++ {
Expand All @@ -67,11 +71,11 @@ func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint
transports := newTransports(l)
for _, i := range names {

r, err := newRaftNode(hclog.New(&hclog.LoggerOptions{
r, err := newRaftNodeFromFactory(hclog.New(&hclog.LoggerOptions{
Name: i + ":",
Output: logWriter,
Level: hclog.DefaultLevel,
}), transports, transportHooks, names, i)
}), transports, transportHooks, names, i, factory)
if err != nil {
t.Fatalf("Unable to create raftNode:%v : %v", i, err)
}
Expand Down
2 changes: 2 additions & 0 deletions fuzzy/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
12 changes: 9 additions & 3 deletions fuzzy/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type raftNode struct {
}

func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string) (*raftNode, error) {
return newRaftNodeFromFactory(logger, tc, h, nodes, name, raft.NewRaft)
}

// Same type as raft.NewRaft
type factoryFn func(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error)

func newRaftNodeFromFactory(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string, factory factoryFn) (*raftNode, error) {
var err error
var datadir string
datadir, err = resolveDirectory(fmt.Sprintf("data/%v", name), true)
Expand All @@ -46,8 +53,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
config.ShutdownOnRemove = false
config.LocalID = raft.ServerID(name)

var store *rdb.BoltStore
store, err = rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
store, err := rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
if err != nil {
return nil, fmt.Errorf("unable to initialize log %v", err.Error())
}
Expand All @@ -65,7 +71,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
}
fsm := &fuzzyFSM{}
var r *raft.Raft
r, err = raft.NewRaft(config, fsm, store, store, ss, transport)
r, err = factory(config, fsm, store, store, ss, transport)
if err != nil {
return nil, err
}
Expand Down
50 changes: 47 additions & 3 deletions inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ package raft
import (
"errors"
"sync"
"sync/atomic"
)

// InmemStore implements the LogStore and StableStore interface.
// It should NOT EVER be used for production. It is used only for
// unit tests. Use the MDBStore implementation instead.
type InmemStore struct {
storeFail uint32 // accessed atomically as a bool 0/1

// storeSem lets the test control exactly when s StoreLog(s) call takes
// effect.
storeSem chan struct{}

l sync.RWMutex
lowIndex uint64
highIndex uint64
Expand All @@ -24,13 +31,35 @@ type InmemStore struct {
// use for production. Only for testing.
func NewInmemStore() *InmemStore {
i := &InmemStore{
logs: make(map[uint64]*Log),
kv: make(map[string][]byte),
kvInt: make(map[string]uint64),
storeSem: make(chan struct{}, 1),
logs: make(map[uint64]*Log),
kv: make(map[string][]byte),
kvInt: make(map[string]uint64),
}
return i
}

// BlockStore will cause further calls to StoreLog(s) to block indefinitely
// until the returned cancel func is called. Note that if the code or test is
// buggy this could cause a deadlock
func (i *InmemStore) BlockStore() func() {
i.storeSem <- struct{}{}
cancelled := false
return func() {
// Allow multiple calls, subsequent ones are a no op
if !cancelled {
<-i.storeSem
cancelled = true
}
}
}

// FailNext signals that the next call to StoreLog(s) should return an error
// without modifying the log contents. Subsequent calls will succeed again.
func (i *InmemStore) FailNext() {
atomic.StoreUint32(&i.storeFail, 1)
}

// FirstIndex implements the LogStore interface.
func (i *InmemStore) FirstIndex() (uint64, error) {
i.l.RLock()
Expand Down Expand Up @@ -64,8 +93,23 @@ func (i *InmemStore) StoreLog(log *Log) error {

// StoreLogs implements the LogStore interface.
func (i *InmemStore) StoreLogs(logs []*Log) error {
// Block waiting for the semaphore slot if BlockStore has been called. We must
// do this before we take the lock because otherwise we'll block GetLog and
// others too by holding the lock while blocked.
i.storeSem <- struct{}{}
defer func() {
<-i.storeSem
}()

// Switch out fail if it is set so we only fail once
shouldFail := atomic.SwapUint32(&i.storeFail, 0)
if shouldFail == 1 {
return errors.New("IO error")
}

i.l.Lock()
defer i.l.Unlock()

for _, l := range logs {
i.logs[l.Index] = l
if i.lowIndex == 0 {
Expand Down
56 changes: 48 additions & 8 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,58 @@ type LogStore interface {
}

// MonotonicLogStore is an optional interface for LogStore implementations that
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
// this may allow more efficient indexing because the Index values are densely populated. If true is
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
// snapshot is restored. The LogStore must have an efficient implementation of
// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed.
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently
// the most widely used.
// cannot tolerate gaps in between the Index values of consecutive log entries.
// For example, this may allow more efficient indexing because the Index values
// are densely populated. If true is returned, Raft will avoid relying on gaps
// to trigger re-synching logs on followers after a snapshot is restored. The
// LogStore must have an efficient implementation of DeleteLogs for the case
// where all logs are removed, as this must be called after snapshot restore
// when gaps are not allowed. We avoid deleting all records for LogStores that
// do not implement MonotonicLogStore because although it's always correct to do
// so, it has a major negative performance impact on the BoltDB store that is
// currently the most widely used.
type MonotonicLogStore interface {
IsMonotonic() bool
}

type LogWriteCompletion struct {
PersistentIndex uint64
Error error
Duration time.Duration
}

type AsyncLogStore interface {
LogStore

// EnableAsync is called on the log store when a node starts the leader loop.
// A Channel is passed to deliver write completion events. The implementation
// chooses how many events to buffer but the chan may block and this should be
// used as a back-pressure mechanism to slow down syncs to disk. Must be
// called serially with StoreLog* and DeleteRange (i.e from the main
// leader/follower thread). After this returns calls to StoreLog(s) will
// return an error and only StoreLogsAsync should be used until DisableAsync
// is called.
EnableAsync(chan<- LogWriteCompletion)

// DisableAsync is called when the leader steps down to return the LogStore to
// Sync mode since followers currently use Sync writes. They may in the future
// use async writes too however explicit switching modes makes it easier to
// reason about the behaviour of Async vs Sync storage calls as well as
// providing the channel to deliver updates explicitly. DisableAsync will
// block until all in-flight writes are persisted (or fail).
DisableAsync()

// StoreLogsAsync may only be called after EnableAsync but before the
// corresponding DisableAsync call. It will return as soon as the logs are
// available to read from GetLog and reflected in LastIndex, though they may
// still be in-memory only. It will trigger background writing of the logs to
// disk. The background process must eventually deliver a LogWriteCompletion
// to the channel provided to the last EnableAsync call. Each
// LowWriteCompletion indicates that all logs up to the PersistentIndex are
// safely stored on durable storage, or an error has occurred.
StoreLogsAsync(logs []*Log) error
}

func oldestLog(s LogStore) (Log, error) {
var l Log

Expand Down
Loading

0 comments on commit 8d65ae0

Please sign in to comment.