From 8d65ae0ae99a63236682fd9d025c44f1a9cc9f2d Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Mon, 30 Oct 2023 21:55:38 +0000 Subject: [PATCH] WIP: Basic Working LogCacheAsync with fuzzy test --- fuzzy/.gitignore | 1 + fuzzy/async_logs_test.go | 50 +++++ fuzzy/cluster.go | 8 +- fuzzy/go.sum | 2 + fuzzy/node.go | 12 +- inmem_store.go | 50 ++++- log.go | 56 +++++- log_cache_async.go | 405 +++++++++++++++++++++++++++++++++++++++ log_cache_async_test.go | 128 +++++++++++++ raft.go | 71 ++++++- 10 files changed, 761 insertions(+), 22 deletions(-) create mode 100644 fuzzy/.gitignore create mode 100644 fuzzy/async_logs_test.go create mode 100644 log_cache_async.go create mode 100644 log_cache_async_test.go diff --git a/fuzzy/.gitignore b/fuzzy/.gitignore new file mode 100644 index 00000000..6320cd24 --- /dev/null +++ b/fuzzy/.gitignore @@ -0,0 +1 @@ +data \ No newline at end of file diff --git a/fuzzy/async_logs_test.go b/fuzzy/async_logs_test.go new file mode 100644 index 00000000..f2d5f7e4 --- /dev/null +++ b/fuzzy/async_logs_test.go @@ -0,0 +1,50 @@ +package fuzzy + +import ( + "math/rand" + "testing" + "time" + + "github.com/hashicorp/raft" +) + +// 5 node cluster where the leader and another node get regularly partitioned off +// eventually all partitions heal, but using async log cache. +func TestRaft_AsyncLogWithPartitions(t *testing.T) { + hooks := NewPartitioner() + + cluster := newRaftClusterWithFactory(t, testLogWriter, "lp", 5, hooks, newAsyncRaft) + cluster.Leader(time.Second * 10) + s := newApplySource("LeaderPartitions") + applier := s.apply(t, cluster, 5) + for i := 0; i < 10; i++ { + pg := hooks.PartitionOff(cluster.log, cluster.LeaderPlus(rand.Intn(4))) + time.Sleep(time.Second * 4) + r := rand.Intn(10) + if r < 1 { + cluster.log.Logf("Healing no partitions!") + } else if r < 4 { + hooks.HealAll(cluster.log) + } else { + hooks.Heal(cluster.log, pg) + } + time.Sleep(time.Second * 5) + } + hooks.HealAll(cluster.log) + cluster.Leader(time.Hour) + applier.stop() + cluster.Stop(t, time.Minute*10) + hooks.Report(t) + cluster.VerifyLog(t, applier.applied) + cluster.VerifyFSM(t) +} + +func newAsyncRaft(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error) { + // Wrap the log store in an async cache + asyncLogs, err := raft.NewLogCacheAsync(128, logs) + if err != nil { + return nil, err + } + + return raft.NewRaft(conf, fsm, asyncLogs, stable, snaps, trans) +} diff --git a/fuzzy/cluster.go b/fuzzy/cluster.go index 93025aef..d9f2a131 100644 --- a/fuzzy/cluster.go +++ b/fuzzy/cluster.go @@ -55,6 +55,10 @@ func (a *LoggerAdapter) Logf(s string, v ...interface{}) { } func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks) *cluster { + return newRaftClusterWithFactory(t, logWriter, namePrefix, n, transportHooks, raft.NewRaft) +} + +func newRaftClusterWithFactory(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks, factory factoryFn) *cluster { res := make([]*raftNode, 0, n) names := make([]string, 0, n) for i := uint(0); i < n; i++ { @@ -67,11 +71,11 @@ func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint transports := newTransports(l) for _, i := range names { - r, err := newRaftNode(hclog.New(&hclog.LoggerOptions{ + r, err := newRaftNodeFromFactory(hclog.New(&hclog.LoggerOptions{ Name: i + ":", Output: logWriter, Level: hclog.DefaultLevel, - }), transports, transportHooks, names, i) + }), transports, transportHooks, names, i, factory) if err != nil { t.Fatalf("Unable to create raftNode:%v : %v", i, err) } diff --git a/fuzzy/go.sum b/fuzzy/go.sum index e013c3a5..e0ce3c31 100644 --- a/fuzzy/go.sum +++ b/fuzzy/go.sum @@ -95,6 +95,8 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/fuzzy/node.go b/fuzzy/node.go index 0b382b7f..12eadb8b 100644 --- a/fuzzy/node.go +++ b/fuzzy/node.go @@ -24,6 +24,13 @@ type raftNode struct { } func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string) (*raftNode, error) { + return newRaftNodeFromFactory(logger, tc, h, nodes, name, raft.NewRaft) +} + +// Same type as raft.NewRaft +type factoryFn func(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error) + +func newRaftNodeFromFactory(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string, factory factoryFn) (*raftNode, error) { var err error var datadir string datadir, err = resolveDirectory(fmt.Sprintf("data/%v", name), true) @@ -46,8 +53,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes [] config.ShutdownOnRemove = false config.LocalID = raft.ServerID(name) - var store *rdb.BoltStore - store, err = rdb.NewBoltStore(filepath.Join(datadir, "store.bolt")) + store, err := rdb.NewBoltStore(filepath.Join(datadir, "store.bolt")) if err != nil { return nil, fmt.Errorf("unable to initialize log %v", err.Error()) } @@ -65,7 +71,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes [] } fsm := &fuzzyFSM{} var r *raft.Raft - r, err = raft.NewRaft(config, fsm, store, store, ss, transport) + r, err = factory(config, fsm, store, store, ss, transport) if err != nil { return nil, err } diff --git a/inmem_store.go b/inmem_store.go index 730d03f2..fbeb3e85 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -6,12 +6,19 @@ package raft import ( "errors" "sync" + "sync/atomic" ) // InmemStore implements the LogStore and StableStore interface. // It should NOT EVER be used for production. It is used only for // unit tests. Use the MDBStore implementation instead. type InmemStore struct { + storeFail uint32 // accessed atomically as a bool 0/1 + + // storeSem lets the test control exactly when s StoreLog(s) call takes + // effect. + storeSem chan struct{} + l sync.RWMutex lowIndex uint64 highIndex uint64 @@ -24,13 +31,35 @@ type InmemStore struct { // use for production. Only for testing. func NewInmemStore() *InmemStore { i := &InmemStore{ - logs: make(map[uint64]*Log), - kv: make(map[string][]byte), - kvInt: make(map[string]uint64), + storeSem: make(chan struct{}, 1), + logs: make(map[uint64]*Log), + kv: make(map[string][]byte), + kvInt: make(map[string]uint64), } return i } +// BlockStore will cause further calls to StoreLog(s) to block indefinitely +// until the returned cancel func is called. Note that if the code or test is +// buggy this could cause a deadlock +func (i *InmemStore) BlockStore() func() { + i.storeSem <- struct{}{} + cancelled := false + return func() { + // Allow multiple calls, subsequent ones are a no op + if !cancelled { + <-i.storeSem + cancelled = true + } + } +} + +// FailNext signals that the next call to StoreLog(s) should return an error +// without modifying the log contents. Subsequent calls will succeed again. +func (i *InmemStore) FailNext() { + atomic.StoreUint32(&i.storeFail, 1) +} + // FirstIndex implements the LogStore interface. func (i *InmemStore) FirstIndex() (uint64, error) { i.l.RLock() @@ -64,8 +93,23 @@ func (i *InmemStore) StoreLog(log *Log) error { // StoreLogs implements the LogStore interface. func (i *InmemStore) StoreLogs(logs []*Log) error { + // Block waiting for the semaphore slot if BlockStore has been called. We must + // do this before we take the lock because otherwise we'll block GetLog and + // others too by holding the lock while blocked. + i.storeSem <- struct{}{} + defer func() { + <-i.storeSem + }() + + // Switch out fail if it is set so we only fail once + shouldFail := atomic.SwapUint32(&i.storeFail, 0) + if shouldFail == 1 { + return errors.New("IO error") + } + i.l.Lock() defer i.l.Unlock() + for _, l := range logs { i.logs[l.Index] = l if i.lowIndex == 0 { diff --git a/log.go b/log.go index 4ae21932..848ad029 100644 --- a/log.go +++ b/log.go @@ -130,18 +130,58 @@ type LogStore interface { } // MonotonicLogStore is an optional interface for LogStore implementations that -// cannot tolerate gaps in between the Index values of consecutive log entries. For example, -// this may allow more efficient indexing because the Index values are densely populated. If true is -// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a -// snapshot is restored. The LogStore must have an efficient implementation of -// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed. -// We avoid deleting all records for LogStores that do not implement MonotonicLogStore -// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently -// the most widely used. +// cannot tolerate gaps in between the Index values of consecutive log entries. +// For example, this may allow more efficient indexing because the Index values +// are densely populated. If true is returned, Raft will avoid relying on gaps +// to trigger re-synching logs on followers after a snapshot is restored. The +// LogStore must have an efficient implementation of DeleteLogs for the case +// where all logs are removed, as this must be called after snapshot restore +// when gaps are not allowed. We avoid deleting all records for LogStores that +// do not implement MonotonicLogStore because although it's always correct to do +// so, it has a major negative performance impact on the BoltDB store that is +// currently the most widely used. type MonotonicLogStore interface { IsMonotonic() bool } +type LogWriteCompletion struct { + PersistentIndex uint64 + Error error + Duration time.Duration +} + +type AsyncLogStore interface { + LogStore + + // EnableAsync is called on the log store when a node starts the leader loop. + // A Channel is passed to deliver write completion events. The implementation + // chooses how many events to buffer but the chan may block and this should be + // used as a back-pressure mechanism to slow down syncs to disk. Must be + // called serially with StoreLog* and DeleteRange (i.e from the main + // leader/follower thread). After this returns calls to StoreLog(s) will + // return an error and only StoreLogsAsync should be used until DisableAsync + // is called. + EnableAsync(chan<- LogWriteCompletion) + + // DisableAsync is called when the leader steps down to return the LogStore to + // Sync mode since followers currently use Sync writes. They may in the future + // use async writes too however explicit switching modes makes it easier to + // reason about the behaviour of Async vs Sync storage calls as well as + // providing the channel to deliver updates explicitly. DisableAsync will + // block until all in-flight writes are persisted (or fail). + DisableAsync() + + // StoreLogsAsync may only be called after EnableAsync but before the + // corresponding DisableAsync call. It will return as soon as the logs are + // available to read from GetLog and reflected in LastIndex, though they may + // still be in-memory only. It will trigger background writing of the logs to + // disk. The background process must eventually deliver a LogWriteCompletion + // to the channel provided to the last EnableAsync call. Each + // LowWriteCompletion indicates that all logs up to the PersistentIndex are + // safely stored on durable storage, or an error has occurred. + StoreLogsAsync(logs []*Log) error +} + func oldestLog(s LogStore) (Log, error) { var l Log diff --git a/log_cache_async.go b/log_cache_async.go new file mode 100644 index 00000000..cc2371ee --- /dev/null +++ b/log_cache_async.go @@ -0,0 +1,405 @@ +package raft + +import ( + "errors" + "sync" + "sync/atomic" + "time" +) + +// LogCacheAsync is like LogCache but implements LogStoreAsync, allowing for +// writing and flush to disk in background and in larger batches than written +// through group commit. +type LogCacheAsync struct { + // These first fields are accessed atomically and are first to ensure they + // stay 64 bit aligned. They are only updated with state locked but are read + // without holding the mutex by concurrent readers. + + // lastIndex is the highest readable Log index. + lastIndex uint64 + + // persistentIndex is the highest index known to be safely synced to the + // underlying store. + persistentIndex uint64 + + // Static params + store LogStore + monotonic bool + size uint64 + sizeMask uint64 + + // mu protects the mutable state + state struct { + sync.Mutex + + // cache is a ring buffer with static size which must be a power of two. + cache []*Log + + // completionCh is non-nil after EnableAsync is called and canonically + // defines whether we are in async or sync mode. + completionCh chan<- LogWriteCompletion + + // triggerChan is 1-buffered to be used as an edge trigger for the + // background sync. + triggerChan chan syncRequest + } +} + +type syncRequest struct { + startTime time.Time + stop bool + doneCh chan struct{} +} + +// NewLogCacheAsync creates an async cache on top of store. The capacity if not +// a power of two will be rounded up to the nearest power of two. The capacity +// MUST be at least 2x the Config.MaxAppendEntries. The maximum allowed for that +// config is 1024 so a size or 2048 or greater will always be safe with current +// Config validation code. If capacity is lower than MaxAppendEntries in the +// raft config then writes could eventually stall. Since this is unrecoverable +// the server will panic. Take care if you allow user configurable +// MaxAppendEntries! +func NewLogCacheAsync(capacity uint64, store LogStore) (*LogCacheAsync, error) { + last, err := store.LastIndex() + if err != nil { + return nil, err + } + + size := nextPowerOf2(capacity) + + c := &LogCacheAsync{ + lastIndex: last, + persistentIndex: last, + store: store, + size: size, + sizeMask: size - 1, // 0b10000 -> 0b01111 + } + c.state.cache = make([]*Log, size) + c.state.triggerChan = make(chan syncRequest, 1) + + if ms, ok := store.(MonotonicLogStore); ok { + c.monotonic = ms.IsMonotonic() + } + + return c, nil +} + +func nextPowerOf2(cap uint64) uint64 { + res := uint64(1) + for res < cap { + res = res << 1 + } + return res +} + +// FirstIndex returns the first index written. 0 for no entries. +func (c *LogCacheAsync) FirstIndex() (uint64, error) { + return c.store.FirstIndex() +} + +// LastIndex returns the last index written. 0 for no entries. +func (c *LogCacheAsync) LastIndex() (uint64, error) { + return atomic.LoadUint64(&c.lastIndex), nil +} + +// GetLog gets a log entry at a given index. +func (c *LogCacheAsync) GetLog(index uint64, log *Log) error { + // Quick check to see if it's even possibly in the cache so we avoid locking + // at all in the case of scanning through old records. + lastIdx := atomic.LoadUint64(&c.lastIndex) + + // minPossibleIdx is the lowest log we could possibly have cached. We might + // not have that low because we just started or because we are currently + // writing a batch over the top but below this we know it won't be in cache so + // there is no need to lock at all. + minPossibleIdx := uint64(1) + if lastIdx > c.size { + minPossibleIdx = lastIdx - c.size + } + if index < minPossibleIdx { + return c.store.GetLog(index, log) + } + + // Check cache + c.state.Lock() + cached := c.state.cache[index&c.sizeMask] // equivalent to index % size + c.state.Unlock() + + if cached != nil && cached.Index == index { + *log = *cached + return nil + } + return c.store.GetLog(index, log) +} + +// StoreLog stores a log entry. +func (c *LogCacheAsync) StoreLog(log *Log) error { + return c.StoreLogs([]*Log{log}) +} + +// StoreLogs stores multiple log entries. +func (c *LogCacheAsync) StoreLogs(logs []*Log) error { + // Shortcut in this unlikely case to avoid panics below. + if len(logs) < 1 { + return nil + } + + c.state.Lock() + isAsync := c.state.completionCh != nil + c.state.Unlock() + if isAsync { + return errors.New("call to sync StoreLog(s) when in async mode") + } + + // Pass through sync writes to the underlying store. Don't hold lock while + // doing IO since there can only be a single writer anyway. + err := c.store.StoreLogs(logs) + if err != nil { + return err + } + + // On success, populate the cache + c.state.Lock() + for _, l := range logs { + c.state.cache[l.Index&c.sizeMask] = l + } + lastIdx := logs[len(logs)-1].Index + atomic.StoreUint64(&c.lastIndex, lastIdx) + atomic.StoreUint64(&c.persistentIndex, lastIdx) + c.state.Unlock() + return nil +} + +// DeleteRange deletes a range of log entries. The range is inclusive. We only +// support DeleteRange calls in Sync mode which makes reasoning about the cache +// simple because the cache can't contain any un-flushed writes in sync mode. +func (c *LogCacheAsync) DeleteRange(min uint64, max uint64) error { + c.state.Lock() + + if c.state.completionCh != nil { + c.state.Unlock() + return errors.New("call to sync DeleteRange when in async mode") + } + // Invalidate the cache + c.state.cache = make([]*Log, c.size) + c.state.Unlock() + + return c.store.DeleteRange(min, max) +} + +// IsMonotonic implement MonotonicLogStore +func (c *LogCacheAsync) IsMonotonic() bool { + return c.monotonic +} + +// EnableAsync implements AsyncLogStore +func (c *LogCacheAsync) EnableAsync(cc chan<- LogWriteCompletion) { + c.state.Lock() + defer c.state.Unlock() + + // Check we are in sync mode to enforce correct state machine usage. + if c.state.completionCh != nil { + panic("already in async mode") + } + + c.state.completionCh = cc + + go c.runFlusher() +} + +func (c *LogCacheAsync) runFlusher() { + var batch []*Log + + for { + syncReq := <-c.state.triggerChan + + // Load the state under lock + c.state.Lock() + persistedIdx := atomic.LoadUint64(&c.persistentIndex) + lastIdx := atomic.LoadUint64(&c.lastIndex) + + for idx := persistedIdx + 1; idx <= lastIdx; idx++ { + batch = append(batch, c.state.cache[idx&c.sizeMask]) + } + + // If we are stopping that means the serial writer is blocked in + // DisableAsync waiting for us to be done syncing. Keep hold of the lock + // while we sync everything that is unflushed and then stop. + if syncReq.stop { + lwc := c.doFlush(batch, syncReq.startTime) + c.deliverCompletionLocked(lwc) + + // Teardown async state here while we hold the lock to make it simpler to + // reason about possible races. + c.state.completionCh = nil + close(syncReq.doneCh) + + // Release the lock or we'll deadlock this node next time it's elected! + c.state.Unlock() + return + } + + // Not stopping, unlock to allow concurrent writes while we flush this batch + // to disk. + c.state.Unlock() + + // Might be a no-op if batch is empty + lwc := c.doFlush(batch, syncReq.startTime) + + // Note: if the flush failed we might retry it on the next loop. This is + // safe assuming that the LogStore is atomic and not left in an invalid + // state (which Raft assumes in general already). It might loop and retry + // the write of the same logs next time which may fail again or may even + // succeed before the leaderloop notices the error and steps down. But + // either way it's fine because we don't advance the persistedIndex if it + // fails so we'll keep trying to write the same logs at least not leave a + // gap. Actually if we do error, even if there is no immediate sync trigger + // waiting, the leader will step down and disable async which will mean we + // attempt to flush again anyway. If that fails though (in the stop case + // above) we won't keep retrying and will just re-report the error. + + // Need a lock to deliver the completion and update persistent index + c.state.Lock() + c.deliverCompletionLocked(lwc) + c.state.Unlock() + + close(syncReq.doneCh) + + // Loop around, if more writes have happened, triggerChan will already have + // a syncReq buffered and we'll immediately start flushing again. + } +} + +func (c *LogCacheAsync) deliverCompletionLocked(lwc *LogWriteCompletion) { + if lwc == nil { + return + } + if lwc.Error == nil { + atomic.StoreUint64(&c.persistentIndex, lwc.PersistentIndex) + } + c.state.completionCh <- *lwc +} + +func (c *LogCacheAsync) doFlush(logs []*Log, start time.Time) *LogWriteCompletion { + if len(logs) < 1 { + return nil + } + + // Write to the underlying store + err := c.store.StoreLogs(logs) + + lwc := LogWriteCompletion{ + PersistentIndex: logs[0].Index - 1, + Error: err, + Duration: time.Since(start), + } + if err == nil { + lwc.PersistentIndex = logs[len(logs)-1].Index + } + return &lwc +} + +// DisableAsync implements AsyncLogStore +func (c *LogCacheAsync) DisableAsync() { + c.state.Lock() + // Check we are in sync mode to enforce correct state machine usage. + if c.state.completionCh == nil { + panic("already in sync mode") + } + // unlock again since the flusher will need the lock to stop and clear state. + // This method can only be called from the serial leader loop so we don't have + // to worry about races from other callers of Append, Enable* or Disable*. + c.state.Unlock() + + // First, wait for any pending writes to flush. In this case we do wait even + // if the buffer is ful since we need to see the ack. + doneCh := make(chan struct{}) + c.state.triggerChan <- syncRequest{ + startTime: time.Now(), + stop: true, // Tell the flusher to exit and cleanup async state + doneCh: doneCh, + } + // Wait for the sync to be done + <-doneCh +} + +// StoreLogsAsync implements AsyncLogStore +func (c *LogCacheAsync) StoreLogsAsync(logs []*Log) error { + c.state.Lock() + defer c.state.Unlock() + + if c.state.completionCh == nil { + return errors.New("call to StoreLogsAsync when in sync mode") + } + + start := time.Now() + + persistedIdx := atomic.LoadUint64(&c.persistentIndex) + lastIdx := atomic.LoadUint64(&c.lastIndex) + + // Make sure there is room in the cache for all the logs we need to write. + // It's very unlikely there won't be, but if we are writing really fast into a + // small cache and the flusher is blocked on IO for a while then we need to + // ensure we don't overwrite cache entries that are not persistent yet! + for !hasSpaceFor(len(logs), lastIdx, persistedIdx, c.size) { + // We need to block and wait for they sync thread to persist some more + // stuff! We do that by sending a sync request even though it's already busy + // this lets us get notified about when it's free. Note that even though we + // unlock and it's _possible_ for another StoreLogsAsync call to be made, + doneCh := make(chan struct{}) + c.state.triggerChan <- syncRequest{startTime: start, doneCh: doneCh} + c.state.Unlock() + <-doneCh + c.state.Lock() + // Reload the indexes now sync is done so we can check if there is space + // now. + persistedIdx = atomic.LoadUint64(&c.persistentIndex) + lastIdx = atomic.LoadUint64(&c.lastIndex) + } + + writeIdx := lastIdx + 1 + + // Write the logs into the buffer. We know we aren't overwriting un-persisted + // entries thanks to the check above and the fact that we stayed locked since + // hasSpaceFor returned true. + for _, log := range logs { + c.state.cache[writeIdx&c.sizeMask] = log + lastIdx = log.Index + writeIdx++ + } + atomic.StoreUint64(&c.lastIndex, lastIdx) + + // Trigger a sync in the background + doneCh := make(chan struct{}) + + // Don't wait, if the trigger buffer is full then there is already a sync + // queued which will pick up our most recent changes. + select { + case c.state.triggerChan <- syncRequest{startTime: start, doneCh: doneCh}: + default: + } + + return nil +} + +// hasSpaceFor works out if there are enough free slots in the buffer that are +// already persisted (or empty) for n more logs to be added. +func hasSpaceFor(n int, lastIdx, persistIdx, size uint64) bool { + if uint64(n) > size { + // This should not be possible if the user of the library follows the + // guidance in the type's Doc comment! + panic("trying to write more logs than will ever fit in the cache") + } + + // If we add n logs, the new lastIdx will be + newLastIdx := lastIdx + uint64(n) + + // See how far ahead of or persistIdx that is in the buffer + unpersisted := newLastIdx - persistIdx + + // As long as the new number of unpersisted records is no larger than the + // whole buffer, we have space. If it is greater, that implies that we'd have + // to overwrite the oldest logs even though they still haven't been persisted + // yet! + return unpersisted <= size +} diff --git a/log_cache_async_test.go b/log_cache_async_test.go new file mode 100644 index 00000000..ac568dd5 --- /dev/null +++ b/log_cache_async_test.go @@ -0,0 +1,128 @@ +package raft + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLogCacheAsyncBasics(t *testing.T) { + underlying := NewInmemStore() + + c, err := NewLogCacheAsync(32, underlying) + require.NoError(t, err) + + // Starts in sync mode and can write some logs in sync mode. + require.NoError(t, c.StoreLogs(makeLogs(1, 5))) + assertLogContents(t, c, 1, 5) + + // Async appends should error + require.ErrorContains(t, c.StoreLogsAsync(makeLogs(6, 10)), "in sync mode") + assertLogContents(t, c, 1, 5) + + // Switch to Async mode + compCh := make(chan LogWriteCompletion, 1) + c.EnableAsync(compCh) + + // Sync appends should error + require.ErrorContains(t, c.StoreLogs(makeLogs(6, 10)), "in async mode") + assertLogContents(t, c, 1, 5) + + // Block up the underlying store indefinitely + blockCancel := underlying.BlockStore() + + timeout := time.AfterFunc(100*time.Millisecond, func() { + blockCancel() + t.Fatal("timeout") + }) + + // (multiple) Async appends should work and return even though the underlying store is + // not done syncing to disk yet. + require.NoError(t, c.StoreLogsAsync(makeLogs(6, 8))) + require.NoError(t, c.StoreLogsAsync(makeLogs(9, 10))) + timeout.Stop() + + // The async log should "see" those updates + assertLogContents(t, c, 1, 10) + + // But the underlying log doesn't have them yet + assertLogContents(t, underlying, 1, 5) + + // Unblock the write on the underlying log + blockCancel() + + // Wait for the completion event + select { + case lwc := <-compCh: + require.NoError(t, lwc.Error) + // Should get a single completion for all logs up to 10 + require.Equal(t, 10, int(lwc.PersistentIndex)) + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for IO completion") + } + + // Now the underlying should have all the logs + assertLogContents(t, underlying, 1, 10) + + // Write some more async + blockCancel = underlying.BlockStore() + timeout = time.AfterFunc(100*time.Millisecond, func() { + blockCancel() + t.Fatal("timeout") + }) + + // (multiple) Async appends should work and return even though the underlying store is + // not done syncing to disk yet. + require.NoError(t, c.StoreLogsAsync(makeLogs(11, 12))) + require.NoError(t, c.StoreLogsAsync(makeLogs(13, 15))) + timeout.Stop() + + assertLogContents(t, c, 1, 15) + + // Fail the underlying write + underlying.FailNext() + blockCancel() +} + +// TODO: +// yt + +func assertLogContents(t *testing.T, s LogStore, min, max int) { + t.Helper() + + // It's easier to debug if we can see the actual contents visually so instead + // of just asserting things match peicemeal, build a humaan-readable dump and + // check it matches expectations. + var expected, got []string + + var log Log + for idx := min; idx <= max; idx++ { + expected = append(expected, fmt.Sprintf("%d => op-%d", idx, idx)) + + var gotS string + if err := s.GetLog(uint64(idx), &log); err != nil { + gotS = fmt.Sprintf("%d => ", idx, err) + } else { + gotS = fmt.Sprintf("%d => %s", log.Index, log.Data) + } + got = append(got, gotS) + } + require.Equal(t, expected, got) +} + +func makeLog(idx int) *Log { + return &Log{ + Index: uint64(idx), + Data: []byte(fmt.Sprintf("op-%d", idx)), + } +} + +func makeLogs(min, max int) []*Log { + logs := make([]*Log, 0, max-min) + for idx := min; idx <= max; idx++ { + logs = append(logs, makeLog(idx)) + } + return logs +} diff --git a/raft.go b/raft.go index 439fe6e2..7ed24150 100644 --- a/raft.go +++ b/raft.go @@ -92,6 +92,7 @@ type leaderState struct { replState map[ServerID]*followerReplication notify map[*verifyFuture]struct{} stepDown chan struct{} + logWriteCompletionCh chan LogWriteCompletion } // setLeader is used to modify the current leader Address and ID of the cluster @@ -413,6 +414,20 @@ func (r *Raft) setupLeaderState() { r.leaderState.replState = make(map[ServerID]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) + + // Setup async log synching if supported. Only allocate the chan if we will + // use it a nil chan will just block forever in the select below. + asyncStore, supportsAsync := r.logs.(AsyncLogStore) + if supportsAsync { + // Arbitrary buffer size. A small buffer might help smooth over bumps but if + // the leaderloop is unable to accept these fast something is pretty wrong + // so having a huge backlog won't help. + r.leaderState.logWriteCompletionCh = make(chan LogWriteCompletion, 8) + asyncStore.EnableAsync(r.leaderState.logWriteCompletionCh) + + // Note we need to disable Async when we leave the leader loop but we do it + // in the big defer below to make it easier to reason about ordering. + } } // runLeader runs the main loop while in leader state. Do the setup here and drop into @@ -477,6 +492,12 @@ func (r *Raft) runLeader() { } // Clear all the state + if r.leaderState.logWriteCompletionCh != nil { + // We must have an async log store for this to be true + als := r.logs.(AsyncLogStore) + als.DisableAsync() + r.leaderState.logWriteCompletionCh = nil + } r.leaderState.commitCh = nil r.leaderState.commitment = nil r.leaderState.inflight = nil @@ -870,6 +891,22 @@ func (r *Raft) leaderLoop() { r.dispatchLogs(ready) } + case lwc := <-r.leaderState.logWriteCompletionCh: + // A log append completed (or failed) while using an AsyncLogStore. + if lwc.Error != nil { + // Failing to write logs to disk leaves us in an invalid state. Step + // down. Note that the defer in runLeader will take care of failing all + // the in-flight requests for us with ErrLeadershipLost which seems + // reasonable. + r.logger.Error("failed to sync logs to disk", "error", lwc.Error) + r.setState(Follower) + continue + } + metrics.AddSample([]string{"raft", "asyncFlush"}, float32(lwc.Duration.Milliseconds())) + + // Update the commitment now we know we have the logs safely on disk! + r.leaderState.commitment.match(r.localID, lwc.PersistentIndex) + case <-lease: r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down @@ -1178,8 +1215,13 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { r.startStopReplication() } -// dispatchLog is called on the leader to push a log to disk, mark it -// as inflight and begin replication of it. +// dispatchLog is called on the leader to push a log to disk, mark it as +// inflight and begin replication of it. If the LogStore implements +// AsyncLogStore then this will return even before the logs are fully persisted. +// See Section 10.2.1 of Diego's raft thesis for why this is OK - in this case +// we begin replicating right away but we don't update the commitment +// information until the leaderloop get the async completion confirming the logs +// are safely on disk. func (r *Raft) dispatchLogs(applyLogs []*logFuture) { now := time.Now() defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now) @@ -1201,8 +1243,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.PushBack(applyLog) } - // Write the log entry locally - if err := r.logs.StoreLogs(logs); err != nil { + onWriteErr := func(err error) { r.logger.Error("failed to commit logs", "error", err) for _, applyLog := range applyLogs { applyLog.respond(err) @@ -1210,9 +1251,27 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.setState(Follower) return } - r.leaderState.commitment.match(r.localID, lastIndex) - // Update the last log since it's on disk now + // Write the log entry locally + if asyncStore, ok := r.logs.(AsyncLogStore); ok { + // Async log write + if err := asyncStore.StoreLogsAsync(logs); err != nil { + onWriteErr(err) + return + } + // DON'T update commitment since we didn't persist to disk yet. Async log + // stores will deliver persistence events to the leader loop separately. + } else { + // Sync log write + if err := r.logs.StoreLogs(logs); err != nil { + onWriteErr(err) + return + } + r.leaderState.commitment.match(r.localID, lastIndex) + } + + // Update the last log since it's on available to read in logstore now (but + // might not be committed to disk yet). r.setLastLog(lastIndex, term) // Notify the replicators of the new log