-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Label for Watch commands #1267
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @psrvere! This is a sensitive change, I have left some comments clarifying the requirements.
} | ||
|
||
Manager struct { | ||
querySubscriptionMap map[string]map[uint32]struct{} // querySubscriptionMap is a map of Key -> [fingerprint1, fingerprint2, ...] | ||
tcpSubscriptionMap map[uint32]map[chan *cmd.DiceDBCmd]struct{} // tcpSubscriptionMap is a map of fingerprint -> [client1Chan, client2Chan, ...] | ||
fingerprintCmdMap map[uint32]*cmd.DiceDBCmd // fingerprintCmdMap is a map of fingerprint -> DiceDBCmd | ||
fingerprintWatchLabelMap map[uint32]string // fingerprintLabelMap is a map of fingerprint -> watch label |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably don't need to maintain this map here, the first response is not routed through the watch manager, and there is no need to store the uuid state in the server (it is only for synchronization at the SDK level)
@@ -40,6 +42,7 @@ func NewManager(cmdWatchSubscriptionChan chan WatchSubscription, cmdWatchChan ch | |||
querySubscriptionMap: make(map[string]map[uint32]struct{}), | |||
tcpSubscriptionMap: make(map[uint32]map[chan *cmd.DiceDBCmd]struct{}), | |||
fingerprintCmdMap: make(map[uint32]*cmd.DiceDBCmd), | |||
fingerprintWatchLabelMap: make(map[uint32]string), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably don't need this either, for the same reason as the other one.
internal/worker/worker.go
Outdated
@@ -246,6 +248,10 @@ func (w *BaseWorker) executeCommand(ctx context.Context, diceDBCmd *cmd.DiceDBCm | |||
// Modify the command name to remove the .WATCH suffix, this will allow us to generate a consistent | |||
// fingerprint (which uses the command name without the suffix) | |||
diceDBCmd.Cmd = diceDBCmd.Cmd[:len(diceDBCmd.Cmd)-6] | |||
|
|||
// generate a watch label | |||
watchLabel = uuid.New().String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be generated by the SDK and sent to the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding we should not have to make changes to the watch manager. The changes should only happen in worker.go (which receives uuid generated by the SDK and simply sends it back with the first message)
Note, the first response is generated by modifying the incoming.WATCH command by the .WATCH
suffix and passing it on as a regular command.
Now, the SDK will send an additional argument, the uuid, at the end of its argument list. The server will simply remove this argument from the arg list before sending the command for processing (note that for this first response the watch manager is not involved).
Since the UUID argument is optional, maybe we can have a way of verifying on the server whether the last argument is a UUID label or just part of the actual command arguments. Maybe have a special prefix or something (eg. prefix + (valid uuid string)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the reviews @psrvere! The changes look good. Just a few minor comments and some clarifications in the test.
@@ -15,6 +15,7 @@ type ( | |||
AdhocReqChan chan *cmd.DiceDBCmd // AdhocReqChan is the channel to send adhoc requests to the worker. Required. | |||
WatchCmd *cmd.DiceDBCmd // WatchCmd Represents a unique key for each watch artifact, only populated for subscriptions. | |||
Fingerprint uint32 // Fingerprint is a unique identifier for each watch artifact, only populated for unsubscriptions. | |||
WatchLabel string // WatchLabel is the watch label for the watch command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably remove this? I don't think we're using this anywhere.
@@ -125,6 +126,7 @@ func (m *Manager) handleUnsubscription(sub WatchSubscription) { | |||
delete(m.querySubscriptionMap, key) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change.
internal/worker/worker.go
Outdated
w.cmdWatchSubscriptionChan <- watchmanager.WatchSubscription{ | ||
Subscribe: true, | ||
WatchCmd: cmdList[len(cmdList)-1], | ||
AdhocReqChan: w.adhocReqChan, | ||
WatchLabel: watchLabel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
watchLabel
is not being used by the watch manager so no need to send it.
@@ -101,10 +105,13 @@ func TestGETWATCHWithSDK(t *testing.T) { | |||
watch := subscriber.client.WatchConn(context.Background()) | |||
subscribers[i].watch = watch | |||
assert.True(t, watch != nil) | |||
firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey) | |||
|
|||
uuid := uuid.New().String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to send the uuid on the user side, the SDK already creates one for us inside the Watch() method.
uuid := uuid.New().String() |
firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey) | ||
|
||
uuid := uuid.New().String() | ||
firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey, uuid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey, uuid) | |
firstMsg, err := watch.Watch(context.Background(), "GET", getWatchKey) |
assert.Nil(t, err) | ||
assert.Equal(t, firstMsg.Command, "GET") | ||
assert.Equal(t, uuid, firstMsg.Command) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since uuid is generated by SDK, we don't need to verify its value - we can just verify that it's not empty.
uuid := uuid.New().String() | ||
firstMsg, err := watch.GetWatch(ctx, getWatchWithLabelTestCases[0].key, uuid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above.
I see the latest sdk version as this: |
This PR solve the first message identification problem for subscription commands faced by SDK - Read Here
It generates a watch label (uuid) and adds it to the watch subscription first message.
An integration test is also added to test multiple subscriptions and updates using SDK