Skip to content

Commit

Permalink
Merge pull request #1203 from DiceDB/last-min-2
Browse files Browse the repository at this point in the history
WAL implementation
  • Loading branch information
arpitbbhayani authored Nov 7, 2024
2 parents 5067809 + 1009593 commit 1e9e85b
Show file tree
Hide file tree
Showing 23 changed files with 745 additions and 42 deletions.
1 change: 1 addition & 0 deletions build_protos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
protoc --go_out=. ./internal/wal/wal.proto
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ var (
EnableProfiling = false

EnableWatch = true
LogDir = ""

EnableWAL = true
RestoreFromWAL = false
WALEngine = "sqlite"
)

type Config struct {
Expand Down
5 changes: 0 additions & 5 deletions docs/src/content/docs/commands/GETDEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ Setting a key `mylist` as a list and then trying to use `GETDEL`, which is incom
127.0.0.1:7379> LPUSH mylist "item1"
(integer) 1
127.0.0.1:7379> GETDEL mylist
<<<<<<< HEAD
ERROR WRONGTYPE Operation against a key holding the wrong kind of value
```

Expand All @@ -97,7 +96,3 @@ ERROR WRONGTYPE Operation against a key holding the wrong kind of value
- The key `mylist` is a list, not a string.
- # The `GETDEL` command raises a `WRONGTYPE` error because it expects the key to be a string.
(error) WRONGTYPE Operation against a key holding the wrong kind of value

```
>>>>>>> d43577926873d0df0c8f189cdde6afa65c515ccb
```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/mattn/go-sqlite3 v1.14.24
github.com/mmcloughlin/geohash v0.10.0
github.com/ohler55/ojg v1.25.0
github.com/pelletier/go-toml/v2 v2.2.3
Expand All @@ -58,4 +59,5 @@ require (
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
golang.org/x/crypto v0.28.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
google.golang.org/protobuf v1.35.1
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mmcloughlin/geohash v0.10.0 h1:9w1HchfDfdeLc+jFEf/04D27KP7E2QmpDu52wPbJWRE=
Expand Down Expand Up @@ -131,6 +133,8 @@ golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/async/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
gec := make(chan error)
shardManager := shard.NewShardManager(1, watchChan, nil, gec)
// Initialize the AsyncServer
testServer := server.NewAsyncServer(shardManager, watchChan)
testServer := server.NewAsyncServer(shardManager, watchChan, nil)

// Try to bind to a port with a maximum of `totalRetries` retries.
for i := 0; i < totalRetries; i++ {
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
queryWatcherLocal := querymanager.NewQueryManager()
config.HTTPPort = opt.Port
// Initialize the HTTPServer
testServer := server.NewHTTPServer(shardManager)
testServer := server.NewHTTPServer(shardManager, nil)
// Inform the user that the server is starting
fmt.Println("Starting the test server on port", config.HTTPPort)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec)
workerManager := worker.NewWorkerManager(20000, shardManager)
// Initialize the RESP Server
testServer := resp.NewServer(shardManager, workerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec)
testServer := resp.NewServer(shardManager, workerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, nil)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.AsyncServer.Port)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()
config.WebsocketPort = opt.Port
testServer := server.NewWebSocketServer(shardManager, testPort1)
testServer := server.NewWebSocketServer(shardManager, testPort1, nil)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)

// run shard manager
Expand Down
7 changes: 6 additions & 1 deletion internal/cmd/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ type RedisCmds struct {
RequestID uint32
}

// Repr returns a string representation of the command.
func (cmd *DiceDBCmd) Repr() string {
return fmt.Sprintf("%s %s", cmd.Cmd, strings.Join(cmd.Args, " "))
}

// GetFingerprint returns a 32-bit fingerprint of the command and its arguments.
func (cmd *DiceDBCmd) GetFingerprint() uint32 {
return farm.Fingerprint32([]byte(fmt.Sprintf("%s-%s", cmd.Cmd, strings.Join(cmd.Args, " "))))
return farm.Fingerprint32([]byte(cmd.Repr()))
}

// GetKey Returns the key which the command operates on.
Expand Down
4 changes: 2 additions & 2 deletions internal/server/httpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"time"

"github.com/dicedb/dice/internal/eval"

"github.com/dicedb/dice/internal/server/abstractserver"
"github.com/dicedb/dice/internal/wal"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (cim *CaseInsensitiveMux) ServeHTTP(w http.ResponseWriter, r *http.Request)
cim.mux.ServeHTTP(w, r)
}

func NewHTTPServer(shardManager *shard.ShardManager) *HTTPServer {
func NewHTTPServer(shardManager *shard.ShardManager, wl wal.AbstractWAL) *HTTPServer {
mux := http.NewServeMux()
caseInsensitiveMux := &CaseInsensitiveMux{mux: mux}
srv := &http.Server{
Expand Down
26 changes: 14 additions & 12 deletions internal/server/resp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/dicedb/dice/internal/server/abstractserver"
"github.com/dicedb/dice/internal/wal"

dstore "github.com/dicedb/dice/internal/store"
"github.com/dicedb/dice/internal/watchmanager"
Expand Down Expand Up @@ -49,20 +50,21 @@ type Server struct {
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription
cmdWatchChan chan dstore.CmdWatchEvent
globalErrorChan chan error
wl wal.AbstractWAL
}

func NewServer(shardManager *shard.ShardManager, workerManager *worker.WorkerManager, cmdWatchSubscriptionChan chan watchmanager.WatchSubscription,
cmdWatchChan chan dstore.CmdWatchEvent, globalErrChan chan error) *Server {
func NewServer(shardManager *shard.ShardManager, workerManager *worker.WorkerManager,
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription, cmdWatchChan chan dstore.CmdWatchEvent, globalErrChan chan error, wl wal.AbstractWAL) *Server {
return &Server{
Host: config.DiceConfig.AsyncServer.Addr,
Port: config.DiceConfig.AsyncServer.Port,
connBacklogSize: DefaultConnBacklogSize,
workerManager: workerManager,
shardManager: shardManager,
watchManager: watchmanager.NewManager(cmdWatchSubscriptionChan),
cmdWatchChan: cmdWatchChan,
cmdWatchSubscriptionChan: cmdWatchSubscriptionChan,
globalErrorChan: globalErrChan,
Host: config.DiceConfig.AsyncServer.Addr,
Port: config.DiceConfig.AsyncServer.Port,
connBacklogSize: DefaultConnBacklogSize,
workerManager: workerManager,
shardManager: shardManager,
watchManager: watchmanager.NewManager(cmdWatchSubscriptionChan),
cmdWatchChan: cmdWatchChan,
globalErrorChan: globalErrChan,
wl: wl,
}
}

Expand Down Expand Up @@ -198,7 +200,7 @@ func (s *Server) AcceptConnectionRequests(ctx context.Context, wg *sync.WaitGrou
preprocessingChan := make(chan *ops.StoreResponse) // preprocessingChan is specifically for handling responses from shards for commands that require preprocessing

wID := GenerateUniqueWorkerID()
w := worker.NewWorker(wID, responseChan, preprocessingChan, s.cmdWatchSubscriptionChan, ioHandler, parser, s.shardManager, s.globalErrorChan)
w := worker.NewWorker(wID, responseChan, preprocessingChan, s.cmdWatchSubscriptionChan, ioHandler, parser, s.shardManager, s.globalErrorChan, s.wl)

// Register the worker with the worker manager
err = s.workerManager.RegisterWorker(w)
Expand Down
3 changes: 2 additions & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/dicedb/dice/internal/server/abstractserver"
"github.com/dicedb/dice/internal/wal"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/auth"
Expand Down Expand Up @@ -44,7 +45,7 @@ type AsyncServer struct {
}

// NewAsyncServer initializes a new AsyncServer
func NewAsyncServer(shardManager *shard.ShardManager, queryWatchChan chan dstore.QueryWatchEvent) *AsyncServer {
func NewAsyncServer(shardManager *shard.ShardManager, queryWatchChan chan dstore.QueryWatchEvent, wl wal.AbstractWAL) *AsyncServer {
return &AsyncServer{
maxClients: config.DiceConfig.Performance.MaxClients,
connectedClients: make(map[int]*comm.Client),
Expand Down
3 changes: 2 additions & 1 deletion internal/server/websocketServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/dicedb/dice/internal/server/abstractserver"
"github.com/dicedb/dice/internal/wal"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"
Expand Down Expand Up @@ -46,7 +47,7 @@ type WebsocketServer struct {
shutdownChan chan struct{}
}

func NewWebSocketServer(shardManager *shard.ShardManager, port int) *WebsocketServer {
func NewWebSocketServer(shardManager *shard.ShardManager, port int, wl wal.AbstractWAL) *WebsocketServer {
mux := http.NewServeMux()
srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Expand Down
72 changes: 72 additions & 0 deletions internal/wal/wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package wal

import (
"fmt"
"log/slog"
sync "sync"
"time"

"github.com/dicedb/dice/internal/cmd"
)

type AbstractWAL interface {
LogCommand(c *cmd.DiceDBCmd)
Close() error
Init(t time.Time) error
ForEachCommand(f func(c cmd.DiceDBCmd) error) error
}

var (
ticker *time.Ticker
stopCh chan struct{}
mu sync.Mutex
)

func init() {
ticker = time.NewTicker(1 * time.Minute)
stopCh = make(chan struct{})
}

func rotateWAL(wl AbstractWAL) {
mu.Lock()
defer mu.Unlock()

if err := wl.Close(); err != nil {
slog.Warn("error closing the WAL", slog.Any("error", err))
}

if err := wl.Init(time.Now()); err != nil {
slog.Warn("error creating a new WAL", slog.Any("error", err))
}
}

func periodicRotate(wl AbstractWAL) {
for {
select {
case <-ticker.C:
rotateWAL(wl)
case <-stopCh:
return
}
}
}

func InitBG(wl AbstractWAL) {
go periodicRotate(wl)
}

func ShutdownBG() {
close(stopCh)
ticker.Stop()
}

func ReplayWAL(wl AbstractWAL) {
err := wl.ForEachCommand(func(c cmd.DiceDBCmd) error {
fmt.Println("replaying", c.Cmd, c.Args)
return nil
})

if err != nil {
slog.Warn("error replaying WAL", slog.Any("error", err))
}
}
Loading

0 comments on commit 1e9e85b

Please sign in to comment.