diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index d342fdd4..b6a44036 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -95,14 +95,15 @@ def multi(watch: nil) if watch.nil? || watch.empty? transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder) yield transaction + return transaction.execute + end + + ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, resharding| + transaction = ::RedisClient::Cluster::Transaction.new( + @router, @command_builder, node: c, resharding: resharding + ) + yield transaction transaction.execute - else - locking = ::RedisClient::Cluster::OptimisticLocking.new(watch, @router) - locking.watch do |c| - transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, c) - yield transaction - transaction.execute - end end end diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index 35f50ebc..0732ff26 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -7,29 +7,32 @@ class RedisClient class Cluster class OptimisticLocking - def initialize(keys, router) - @node = find_node!(keys, router) - @keys = keys + def initialize(router) + @router = router end - def watch - @node.with do |c| - c.call('WATCH', *@keys) - reply = yield(c) - c.call('UNWATCH') - reply + def watch(keys) + ensure_safe_keys(keys) + node = find_node(keys) + cnt = 0 # We assume redirects occurred when incrementing it. + + @router.handle_redirection(node, retry_count: 1) do |nd| + cnt += 1 + nd.with do |c| + c.call('WATCH', *keys) + reply = yield(c, cnt > 1) + c.call('UNWATCH') + reply + end end end private - def find_node!(keys, router) - raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" unless safe?(keys) + def ensure_safe_keys(keys) + return if safe?(keys) - node_key = router.find_primary_node_key(['WATCH', *keys]) - raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" if node_key.nil? - - router.find_node(node_key) + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" end def safe?(keys) @@ -43,6 +46,13 @@ def safe?(keys) slots.uniq.size == 1 end + + def find_node(keys) + node_key = @router.find_primary_node_key(['WATCH', *keys]) + return @router.find_node(node_key) unless node_key.nil? + + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" + end end end end diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 17a3448f..8420e22b 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -8,8 +8,9 @@ class RedisClient class Cluster class Transaction ConsistencyError = Class.new(::RedisClient::Error) + MAX_REDIRECTION = 2 - def initialize(router, command_builder, node = nil) + def initialize(router, command_builder, node: nil, resharding: false) @router = router @command_builder = command_builder @retryable = true @@ -17,6 +18,7 @@ def initialize(router, command_builder, node = nil) @pending_commands = [] @node = node prepare_tx unless @node.nil? + @resharding_state = resharding end def call(*command, **kwargs, &block) @@ -92,7 +94,7 @@ def prepare_tx def settle @pipeline.call('EXEC') - send_transaction(@node, redirect: true) + send_transaction(@node, redirect: MAX_REDIRECTION) end def send_transaction(client, redirect:) @@ -109,7 +111,7 @@ def send_pipeline(client, redirect:) client.middlewares.call_pipelined(commands, client.config) do connection.call_pipelined(commands, nil) rescue ::RedisClient::CommandError => e - return handle_command_error!(commands, e) if redirect + return handle_command_error!(client, commands, e, redirect: redirect) unless redirect.zero? raise end @@ -138,39 +140,30 @@ def coerce_results!(results, offset: 1) results end - def handle_command_error!(commands, err) + def handle_command_error!(client, commands, err, redirect:) # rubocop:disable Metrics/AbcSize if err.message.start_with?('CROSSSLOT') raise ConsistencyError, "#{err.message}: #{err.command}" - elsif err.message.start_with?('MOVED', 'ASK') - ensure_the_same_node!(commands) - handle_redirection(err) + elsif err.message.start_with?('MOVED') + ensure_the_same_node!(client, commands) + node = @router.assign_redirection_node(err.message) + send_transaction(node, redirect: redirect - 1) + elsif err.message.start_with?('ASK') + ensure_the_same_node!(client, commands) + node = @router.assign_asking_node(err.message) + try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err else raise err end end - def ensure_the_same_node!(commands) - expected_node_key = NodeKey.build_from_client(@node) + def ensure_the_same_node!(client, commands) + node_keys = commands.map { |command| @router.find_primary_node_key(command) }.compact.uniq + expected_node_key = ::RedisClient::Cluster::NodeKey.build_from_client(client) - commands.each do |command| - node_key = @router.find_primary_node_key(command) - next if node_key.nil? - next if node_key == expected_node_key - - raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}" - end - end + return if !@resharding_state && node_keys.size == 1 && node_keys.first == expected_node_key + return if @resharding_state && node_keys.size == 1 - def handle_redirection(err) - if err.message.start_with?('MOVED') - node = @router.assign_redirection_node(err.message) - send_transaction(node, redirect: false) - elsif err.message.start_with?('ASK') - node = @router.assign_asking_node(err.message) - try_asking(node) ? send_transaction(node, redirect: false) : err - else - raise err - end + raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}") end def try_asking(node) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index b753b317..1cd4753d 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -81,6 +81,28 @@ def test_the_state_of_cluster_resharding_with_transaction assert_equal(1, call_cnt) end + def test_the_state_of_cluster_resharding_with_transaction_and_watch + call_cnt = 0 + + do_resharding_test do |keys| + @client.multi(watch: keys) do |tx| + call_cnt += 1 + keys.each do |key| + tx.call('SET', key, '0') + tx.call('INCR', key) + end + end + + keys.each do |key| + want = '1' + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end + end + + assert_equal(1, call_cnt) + end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection # This test is excercising a very delicate race condition; i think the use of @client to set # the keys in do_resharding_test is actually causing the race condition not to happen, so this @@ -147,6 +169,20 @@ def new_test_client end end + class Pooled < TestingWrapper + include Mixin + + private + + def new_test_client + ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + fixed_hostname: TEST_FIXED_HOSTNAME, + **TEST_GENERIC_OPTIONS + ).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2) + end + end + class ScaleReadRandom < TestingWrapper include Mixin @@ -162,6 +198,10 @@ def test_the_state_of_cluster_resharding_with_transaction skip('https://github.com/redis/redis/issues/11312') end + def test_the_state_of_cluster_resharding_with_transaction_and_watch + skip('https://github.com/redis/redis/issues/11312') + end + private def new_test_client @@ -190,6 +230,10 @@ def test_the_state_of_cluster_resharding_with_transaction skip('https://github.com/redis/redis/issues/11312') end + def test_the_state_of_cluster_resharding_with_transaction_and_watch + skip('https://github.com/redis/redis/issues/11312') + end + private def new_test_client @@ -218,6 +262,10 @@ def test_the_state_of_cluster_resharding_with_transaction skip('https://github.com/redis/redis/issues/11312') end + def test_the_state_of_cluster_resharding_with_transaction_and_watch + skip('https://github.com/redis/redis/issues/11312') + end + private def new_test_client @@ -230,18 +278,4 @@ def new_test_client ).new_client end end - - class Pooled < TestingWrapper - include Mixin - - private - - def new_test_client - ::RedisClient.cluster( - nodes: TEST_NODE_URIS, - fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS - ).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2) - end - end end