Skip to content

Commit c7b69df

Browse files
author
KJ Tsanaktsidis
committed
Add an explicit #watch method to RedisClient::Cluster
This returns a "watcher" object, which can either be used for three things: * To add keys to be watched on the same connection (by calling #watch * To begin a MULTI transaction on the connection (by calling #multi) * To UNWATCH the connection and return it to its original state (by calling... #unwatch) This means that the following pattern becomes possible in redis-cluster-client: ``` client.watch(["{slot}k1", "{slot}k2"]) do |watcher| # Further reads can be performed with client directly; this is # perfectly safe and they will be individually redirected if required # as normal. # If a read on a slot being watched is redirected, that's also OK, # because it means the final EXEC will fail (since the watch got # modified). current_value = client.call('GET', '{slot}k1') some_other_thing = client.call('GET', '{slot}something_unwatched') # You can add more keys to the watch if required # This could raise a redireciton error, and cause the whole watch # block to be re-attempted watcher.watch('{slot}differet_key') different_value = client.call('GET', '{slot}different_key') if do_transaction? # Once you're ready to perform a transaction, you can use multi... watcher.multi do |tx| # tx is now a pipeliend RedisClient::Cluster::Transaction # instance, like normal multi tx.call('SET', '{slot}k1', 'new_value') tx.call('SET', '{slot}k2', 'new_value') end # At this point, the transaction is committed else # You can abort the transaction by calling unwatch # (this will also happen if an exception is thrown) watcher.unwatch end end ``` This interface is what's required to make redis-clustering/redis-rb work correctly, I think.
1 parent 3230791 commit c7b69df

File tree

4 files changed

+60
-35
lines changed

4 files changed

+60
-35
lines changed

lib/redis_client/cluster.rb

+11-7
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,23 @@ def pipelined
9191
pipeline.execute
9292
end
9393

94-
def multi(watch: nil)
94+
def multi(watch: nil, &block)
9595
if watch.nil? || watch.empty?
9696
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
9797
yield transaction
9898
return transaction.execute
9999
end
100100

101-
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
102-
transaction = ::RedisClient::Cluster::Transaction.new(
103-
@router, @command_builder, node: c, slot: slot
104-
)
105-
yield transaction
106-
transaction.execute
101+
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
102+
watcher.watch(watch) do
103+
watcher.multi(&block)
104+
end
105+
end
106+
107+
def watch(keys)
108+
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
109+
watcher.watch(keys) do
110+
yield watcher
107111
end
108112
end
109113

lib/redis_client/cluster/optimistic_locking.rb

+42-9
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,60 @@
66
class RedisClient
77
class Cluster
88
class OptimisticLocking
9-
def initialize(router)
9+
def initialize(router, command_builder)
1010
@router = router
11+
@command_builder = command_builder
12+
@slot = nil
13+
@conn = nil
1114
end
1215

13-
def watch(keys)
14-
slot = find_slot(keys)
15-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
16+
def watch(keys, &block)
17+
if @conn
18+
# We're already watching, and the caller wants to watch additional keys
19+
add_to_watch(keys)
20+
else
21+
# First call to #watch
22+
start_watch(keys, &block)
23+
end
24+
end
25+
26+
def unwatch
27+
@conn.call('UNWATCH')
28+
end
29+
30+
def multi
31+
transaction = ::RedisClient::Cluster::Transaction.new(
32+
@router, @command_builder, node: @conn, slot: @slot
33+
)
34+
yield transaction
35+
transaction.execute
36+
end
1637

17-
node = @router.find_primary_node_by_slot(slot)
38+
private
39+
40+
def start_watch(keys)
41+
@slot = find_slot(keys)
42+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if @slot.nil?
43+
44+
node = @router.find_primary_node_by_slot(@slot)
1845
@router.handle_redirection(node, retry_count: 1) do |nd|
1946
nd.with do |c|
20-
c.call('WATCH', *keys)
21-
yield(c, slot)
47+
@conn = c
48+
@conn.call('WATCH', *keys)
49+
yield
2250
rescue StandardError
23-
c.call('UNWATCH')
51+
unwatch
2452
raise
2553
end
2654
end
2755
end
2856

29-
private
57+
def add_to_watch(keys)
58+
slot = find_slot(keys)
59+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "inconsistent watch: #{keys.join(' ')}" if slot != @slot
60+
61+
@conn.call('WATCH', *keys)
62+
end
3063

3164
def find_slot(keys)
3265
return if keys.empty?

lib/redis_client/cluster/router.rb

-14
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
4646
when 'memory' then send_memory_command(method, command, args, &block)
4747
when 'script' then send_script_command(method, command, args, &block)
4848
when 'pubsub' then send_pubsub_command(method, command, args, &block)
49-
when 'watch' then send_watch_command(command, &block)
5049
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
5150
@node.call_all(method, command, args).first.then(&TSF.call(block))
5251
when 'flushall', 'flushdb'
@@ -311,19 +310,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
311310
end
312311
end
313312

314-
# for redis-rb
315-
def send_watch_command(command)
316-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?
317-
318-
::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot|
319-
transaction = ::RedisClient::Cluster::Transaction.new(
320-
self, @command_builder, node: c, slot: slot
321-
)
322-
yield transaction
323-
transaction.execute
324-
end
325-
end
326-
327313
def update_cluster_info!
328314
@node.reload!
329315
end

test/redis_client/test_cluster.rb

+7-5
Original file line numberDiff line numberDiff line change
@@ -367,11 +367,13 @@ def test_transaction_in_race_condition
367367
def test_transaction_with_dedicated_watch_command
368368
@client.call('MSET', '{key}1', '0', '{key}2', '0')
369369

370-
got = @client.call('WATCH', '{key}1', '{key}2') do |tx|
371-
tx.call('ECHO', 'START')
372-
tx.call('SET', '{key}1', '1')
373-
tx.call('SET', '{key}2', '2')
374-
tx.call('ECHO', 'FINISH')
370+
got = @client.watch(['{key}1', '{key}2']) do |watcher|
371+
watcher.multi do |tx|
372+
tx.call('ECHO', 'START')
373+
tx.call('SET', '{key}1', '1')
374+
tx.call('SET', '{key}2', '2')
375+
tx.call('ECHO', 'FINISH')
376+
end
375377
end
376378

377379
assert_equal(%w[START OK OK FINISH], got)

0 commit comments

Comments
 (0)