Skip to content

fix: cluster client should be able to handle redirection for the watch command #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
@@ -95,14 +95,15 @@ def multi(watch: nil)
if watch.nil? || watch.empty?
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
yield transaction
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, resharding|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, resharding: resharding
)
yield transaction
transaction.execute
else
locking = ::RedisClient::Cluster::OptimisticLocking.new(watch, @router)
locking.watch do |c|
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, c)
yield transaction
transaction.execute
end
end
end

40 changes: 25 additions & 15 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
@@ -7,29 +7,32 @@
class RedisClient
class Cluster
class OptimisticLocking
def initialize(keys, router)
@node = find_node!(keys, router)
@keys = keys
def initialize(router)
@router = router
end

def watch
@node.with do |c|
c.call('WATCH', *@keys)
reply = yield(c)
c.call('UNWATCH')
reply
def watch(keys)
ensure_safe_keys(keys)
node = find_node(keys)
cnt = 0 # We assume redirects occurred when incrementing it.

@router.handle_redirection(node, retry_count: 1) do |nd|
cnt += 1
nd.with do |c|
c.call('WATCH', *keys)
reply = yield(c, cnt > 1)
c.call('UNWATCH')
reply
end
end
end

private

def find_node!(keys, router)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" unless safe?(keys)
def ensure_safe_keys(keys)
return if safe?(keys)

node_key = router.find_primary_node_key(['WATCH', *keys])
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" if node_key.nil?

router.find_node(node_key)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}"
end

def safe?(keys)
@@ -43,6 +46,13 @@ def safe?(keys)

slots.uniq.size == 1
end

def find_node(keys)
node_key = @router.find_primary_node_key(['WATCH', *keys])
return @router.find_node(node_key) unless node_key.nil?

raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node"
end
end
end
end
47 changes: 20 additions & 27 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
@@ -8,15 +8,17 @@ class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
MAX_REDIRECTION = 2

def initialize(router, command_builder, node = nil)
def initialize(router, command_builder, node: nil, resharding: false)
@router = router
@command_builder = command_builder
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@pending_commands = []
@node = node
prepare_tx unless @node.nil?
@resharding_state = resharding
end

def call(*command, **kwargs, &block)
@@ -92,7 +94,7 @@ def prepare_tx

def settle
@pipeline.call('EXEC')
send_transaction(@node, redirect: true)
send_transaction(@node, redirect: MAX_REDIRECTION)
end

def send_transaction(client, redirect:)
@@ -109,7 +111,7 @@ def send_pipeline(client, redirect:)
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(commands, e) if redirect
return handle_command_error!(client, commands, e, redirect: redirect) unless redirect.zero?

raise
end
@@ -138,39 +140,30 @@ def coerce_results!(results, offset: 1)
results
end

def handle_command_error!(commands, err)
def handle_command_error!(client, commands, err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED', 'ASK')
ensure_the_same_node!(commands)
handle_redirection(err)
elsif err.message.start_with?('MOVED')
ensure_the_same_node!(client, commands)
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
elsif err.message.start_with?('ASK')
ensure_the_same_node!(client, commands)
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
else
raise err
end
end

def ensure_the_same_node!(commands)
expected_node_key = NodeKey.build_from_client(@node)
def ensure_the_same_node!(client, commands)
node_keys = commands.map { |command| @router.find_primary_node_key(command) }.compact.uniq
expected_node_key = ::RedisClient::Cluster::NodeKey.build_from_client(client)

commands.each do |command|
node_key = @router.find_primary_node_key(command)
next if node_key.nil?
next if node_key == expected_node_key

raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
end
end
return if !@resharding_state && node_keys.size == 1 && node_keys.first == expected_node_key
return if @resharding_state && node_keys.size == 1

def handle_redirection(err)
if err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: false)
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: false) : err
else
raise err
end
raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}")
end

def try_asking(node)
62 changes: 48 additions & 14 deletions test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
@@ -81,6 +81,28 @@ def test_the_state_of_cluster_resharding_with_transaction
assert_equal(1, call_cnt)
end

def test_the_state_of_cluster_resharding_with_transaction_and_watch
call_cnt = 0

do_resharding_test do |keys|
@client.multi(watch: keys) do |tx|
call_cnt += 1
keys.each do |key|
tx.call('SET', key, '0')
tx.call('INCR', key)
end
end

keys.each do |key|
want = '1'
got = @client.call('GET', key)
assert_equal(want, got, "Case: GET: #{key}")
end
end

assert_equal(1, call_cnt)
end

def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
# This test is excercising a very delicate race condition; i think the use of @client to set
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
@@ -147,6 +169,20 @@ def new_test_client
end
end

class Pooled < TestingWrapper
include Mixin

private

def new_test_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2)
end
end

class ScaleReadRandom < TestingWrapper
include Mixin

@@ -162,6 +198,10 @@ def test_the_state_of_cluster_resharding_with_transaction
skip('https://github.com/redis/redis/issues/11312')
end

def test_the_state_of_cluster_resharding_with_transaction_and_watch
skip('https://github.com/redis/redis/issues/11312')
end

private

def new_test_client
@@ -190,6 +230,10 @@ def test_the_state_of_cluster_resharding_with_transaction
skip('https://github.com/redis/redis/issues/11312')
end

def test_the_state_of_cluster_resharding_with_transaction_and_watch
skip('https://github.com/redis/redis/issues/11312')
end

private

def new_test_client
@@ -218,6 +262,10 @@ def test_the_state_of_cluster_resharding_with_transaction
skip('https://github.com/redis/redis/issues/11312')
end

def test_the_state_of_cluster_resharding_with_transaction_and_watch
skip('https://github.com/redis/redis/issues/11312')
end

private

def new_test_client
@@ -230,18 +278,4 @@ def new_test_client
).new_client
end
end

class Pooled < TestingWrapper
include Mixin

private

def new_test_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2)
end
end
end