Skip to content

Commit

Permalink
updated SDK, added ctx with timeout to getunwatch test and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
psrvere committed Nov 15, 2024
1 parent 702def2 commit 8655c06
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/dicedb/dicedb-go v0.0.0-20241113134553-45ec8f55dc90
github.com/dicedb/dicedb-go v0.0.0-20241114115413-2d2cf121af84
github.com/gobwas/glob v0.2.3
github.com/google/btree v1.1.3
github.com/google/go-cmp v0.6.0
Expand Down
20 changes: 2 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/sonic v1.12.3 h1:W2MGa7RCU1QTeYRTPE3+88mVC0yXmsRQRChiyVocVjU=
github.com/bytedance/sonic v1.12.3/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
github.com/bytedance/sonic v1.12.4 h1:9Csb3c9ZJhfUWeMtpCDCq6BUoH5ogfDFLUgQ/jG+R0k=
github.com/bytedance/sonic v1.12.4/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
Expand All @@ -32,12 +30,10 @@ github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 h1:y7y0Oa6UawqTFP
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dicedb/dicedb-go v0.0.0-20241113134553-45ec8f55dc90 h1:BCnEUTlnmxZE06WZwTBlHUCSAv8PCh29yV+pINKA1IU=
github.com/dicedb/dicedb-go v0.0.0-20241113134553-45ec8f55dc90/go.mod h1:p7x5/3S6wBEmiRMwxavj1I1P1xsSVQS6fcSbeai5ic4=
github.com/dicedb/dicedb-go v0.0.0-20241114115413-2d2cf121af84 h1:ZQ6K/qcO7gzXjdkIr9BDSdOnhhtBL1JW4wrkONiDwK4=
github.com/dicedb/dicedb-go v0.0.0-20241114115413-2d2cf121af84/go.mod h1:iY/0GYI5j2eTcL/LBsCJpd62j6AIJ/g9ZXii7tL7/jc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
Expand All @@ -54,8 +50,6 @@ github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
Expand Down Expand Up @@ -125,27 +119,17 @@ github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 h1:zzrxE1FKn5ryB
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2/go.mod h1:hzfGeIUDq/j97IG+FhNqkowIyEcD88LrW6fyU3K3WqY=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4=
golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg=
golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
Expand Down
15 changes: 9 additions & 6 deletions integration_tests/commands/resp/getunwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,26 @@ func TestGETUNWATCHWithSDK(t *testing.T) {
publisher := getLocalSdk()
subscribers := []WatchSubscriber{{client: getLocalSdk()}, {client: getLocalSdk()}, {client: getLocalSdk()}}

publisher.Del(context.Background(), getUnwatchKey)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

publisher.Del(ctx, getUnwatchKey)

// subscribe for updates
channels := make([]<-chan *dicedb.WatchResult, len(subscribers))
for i, subscriber := range subscribers {
watch := subscriber.client.WatchConn(context.Background())
watch := subscriber.client.WatchConn(ctx)
subscribers[i].watch = watch
assert.NotNil(t, watch)
firstMsg, err := watch.Watch(context.Background(), "GET", getUnwatchKey)
firstMsg, err := watch.Watch(ctx, "GET", getUnwatchKey)
assert.Nil(t, err)
assert.Equal(t, firstMsg.Command, "GET")
assert.Equal(t, "426696421", firstMsg.Fingerprint)
channels[i] = watch.Channel()
}

// Fire updates and validate receipt
err := publisher.Set(context.Background(), getUnwatchKey, "check", 0).Err()
err := publisher.Set(ctx, getUnwatchKey, "check", 0).Err()
assert.Nil(t, err)

for _, channel := range channels {
Expand All @@ -160,12 +163,12 @@ func TestGETUNWATCHWithSDK(t *testing.T) {

// unsubscribe from updates
for _, subscriber := range subscribers {
err := subscriber.watch.Unwatch(context.Background(), "GET", "426696421")
err := subscriber.watch.Unwatch(ctx, "GET", "426696421")
assert.Nil(t, err)
}

// fire updates and validate that they are not received
err = publisher.Set(context.Background(), getUnwatchKey, "final", 0).Err()
err = publisher.Set(ctx, getUnwatchKey, "final", 0).Err()
assert.Nil(t, err)
for _, channel := range channels {
go func(ch <-chan *dicedb.WatchResult) {
Expand Down
4 changes: 2 additions & 2 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm

if meta.CmdType == Watch {
// Proceed to subscribe after successful execution
w.handleCommandWatch(cmdList, watchLabel)
w.handleCommandWatch(cmdList)
}

return nil
}

// handleCommandWatch sends a watch subscription request to the watch manager.
func (w *BaseWorker) handleCommandWatch(cmdList []*cmd.DiceDBCmd, watchLabel string) {
func (w *BaseWorker) handleCommandWatch(cmdList []*cmd.DiceDBCmd) {
w.cmdWatchSubscriptionChan <- watchmanager.WatchSubscription{
Subscribe: true,
WatchCmd: cmdList[len(cmdList)-1],
Expand Down

0 comments on commit 8655c06

Please sign in to comment.