1
+ package server
2
+
3
+ import (
4
+ "testing"
5
+ "runtime"
6
+ "context"
7
+ "sync"
8
+ "log/slog"
9
+ "fmt"
10
+ "errors"
11
+ "net/http"
12
+ "time"
13
+
14
+
15
+ dice "github.com/dicedb/dicedb-go"
16
+ diceerrors "github.com/dicedb/dice/internal/errors"
17
+ "github.com/dicedb/dice/internal/shard"
18
+ "github.com/dicedb/dice/config"
19
+ dstore "github.com/dicedb/dice/internal/store"
20
+ "github.com/dicedb/dice/internal/server/resp"
21
+ "github.com/dicedb/dice/internal/worker"
22
+ "github.com/dicedb/dice/internal/server/abstractserver"
23
+
24
+ )
25
+
26
+ func BenchmarkStart (b * testing.B ){
27
+ numShards := runtime .NumCPU ()
28
+ runtime .GOMAXPROCS (numShards )
29
+
30
+ var queryWatchChan chan dstore.QueryWatchEvent
31
+ var cmdWatchChan chan dstore.CmdWatchEvent
32
+ var serverErrCh = make (chan error , 2 )
33
+
34
+ shardManager := shard .NewShardManager (uint8 (numShards ), queryWatchChan , cmdWatchChan , serverErrCh )
35
+
36
+ var ctx , cancel = context .WithCancel (context .Background ())
37
+
38
+ var wg = sync.WaitGroup {}
39
+
40
+ wg .Add (1 )
41
+ go func () {
42
+ defer wg .Done ()
43
+ shardManager .Run (ctx )
44
+ }()
45
+
46
+ var serverWg sync.WaitGroup
47
+
48
+ workerManager := worker .NewWorkerManager (config .DiceConfig .Performance .MaxClients , shardManager )
49
+ respServer := resp .NewServer (shardManager , workerManager , cmdWatchChan , serverErrCh )
50
+
51
+ serverWg .Add (1 )
52
+ go runServer (ctx , & serverWg , respServer , serverErrCh )
53
+
54
+ time .Sleep (10 * time .Second )
55
+
56
+ commands := []string {"PING" , "SET foo bar" , "GET foo" , "INCR counter" , "DEL foo" }
57
+ concurrencyLevels := []int {10 , 50 , 100 , 200 }
58
+
59
+ for _ , concurrency := range concurrencyLevels {
60
+ b .Run ("Concurrency=" + fmt .Sprintf ("%d" ,concurrency ), func (b * testing.B ) {
61
+ client := dice .NewClient (& dice.Options {
62
+ Addr : "localhost:7379" ,
63
+ Password : "" ,
64
+ DB : 0 ,
65
+ })
66
+ defer client .Close ()
67
+
68
+ b .ResetTimer ()
69
+ for i := 0 ; i < b .N ; i ++ {
70
+ var cmdWg sync.WaitGroup
71
+ cmdWg .Add (concurrency )
72
+ for j := 0 ; j < concurrency ; j ++ {
73
+ go func () {
74
+ defer cmdWg .Done ()
75
+ for _ , cmd := range commands {
76
+ switch cmd {
77
+ case "PING" :
78
+ client .Ping (ctx )
79
+ case "SET foo bar" :
80
+ client .Set (ctx , "foo" , "bar" , 0 )
81
+ case "GET foo" :
82
+ client .Get (ctx , "foo" )
83
+ case "INCR counter" :
84
+ client .Incr (ctx , "counter" )
85
+ case "DEL foo" :
86
+ client .Del (ctx , "foo" )
87
+ }
88
+ }
89
+ }()
90
+ }
91
+ cmdWg .Wait ()
92
+ }
93
+ })
94
+ }
95
+ cancel ()
96
+ wg .Wait ()
97
+ serverWg .Wait ()
98
+ }
99
+
100
+ func runServer (ctx context.Context , wg * sync.WaitGroup , srv abstractserver.AbstractServer , errCh chan <- error ) {
101
+ defer wg .Done ()
102
+ if err := srv .Run (ctx ); err != nil {
103
+ switch {
104
+ case errors .Is (err , context .Canceled ):
105
+ slog .Debug (fmt .Sprintf ("%T was canceled" , srv ))
106
+ case errors .Is (err , diceerrors .ErrAborted ):
107
+ slog .Debug (fmt .Sprintf ("%T received abort command" , srv ))
108
+ case errors .Is (err , http .ErrServerClosed ):
109
+ slog .Debug (fmt .Sprintf ("%T received abort command" , srv ))
110
+ default :
111
+ slog .Error (fmt .Sprintf ("%T error" , srv ), slog .Any ("error" , err ))
112
+ }
113
+ errCh <- err
114
+ } else {
115
+ slog .Debug ("bye." )
116
+ }
117
+ }
0 commit comments