diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..583431b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.gem +Gemfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4d656a3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM ruby + +WORKDIR / +RUN wget https://raw.githubusercontent.com/antirez/redis/3.0/src/redis-trib.rb +RUN chmod o+x redis-trib.rb + +RUN apt-get update +RUN apt-get install -y openssh-server +RUN mkdir /var/run/sshd +RUN echo 'root:root' | chpasswd +RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config +RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd +ENV NOTVISIBLE "in users profile" +RUN echo "export VISIBLE=now" >> /etc/profile + +ENV GEM_HOME /gems +RUN echo "export GEM_PATH=/gems" >> /etc/profile +RUN gem install redis connection_pool +ADD . /code + +CMD ["/usr/sbin/sshd", "-D"] diff --git a/README.md b/README.md index e6bf4f0..82e53c6 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,73 @@ # Redis-rb-cluster Redis Cluster client work in progress. + It wraps Redis-rb, and eventually should be part of it. For now the goal is to write a simple (but not too simple) client that works + as a reference implementation, and can be used in order to further develop + and test Redis Cluster, that is a work in progress itself. +## Building the Gem locally + +`$ gem build redis-rb-cluster.gemspec` + +`$ gem install rediscluster-0.1.gem` + ## Creating a new instance In order to create a new Redis Cluster instance use: - startup_nodes = [ - {:host => "127.0.0.1", :port => 6379}, - {:host => "127.0.0.1", :port => 6380} - ] - max_cached_connections = 2 - rc = RedisCluster.new(startup_nodes,max_cached_connections) +``` +require 'rediscluster' +startup_nodes = [ + {:host => "localhost", :port => 6379}, + {:host => "localhost", :port => 6380} +] +max_cached_connections = 2 +rc = RedisCluster.new(startup_nodes,max_cached_connections) +``` The startup nodes are a list of addresses of Cluster Nodes, for the client to + work it is important that at least one address works. Startup nodes are used + in order to: -* Initialize the hash slot -> node cache, using the `CLUSTER NODES` command. -* To contact a random node every time we are not able to talk with the right node currently cached for the specified hash slot we are interested in, in the context of the current request. +- Initialize the hash slot -> node cache, using the `CLUSTER NODES` command. +- To contact a random node every time we are not able to talk with the right node currently cached for the specified hash slot we are interested in, in the context of the current request. The list of nodes provided by the user will be extended once the client + will be able to retrieve the cluster configuration. The second parameter in the object initialization is the maximum number of + connections that the client is allowed to cache. Ideally this should be at + least equal to the number of nodes you have, in order to avoid closing and + reopening TCP sockets. However if you have very large cluster and want to + optimize for clients resource saving, it is possible to use a smaller value. ## Sending commands Sending commands is very similar to redis-rb: - rc.get("foo") +``` +rc.get("foo") +``` Currently only a subset of commands are implemented (and in general multi-keys + commands are not supported by Redis Cluster), because for every supported + command we need a function able to identify the key among the arguments. ## Disclaimer -Redis Cluster is released as stable. +Redis Cluster is released as stable. This client is a work in progress that might not be suitable to be used in production environments. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..f3637c0 --- /dev/null +++ b/Rakefile @@ -0,0 +1,7 @@ +require 'rake/testtask' + +Rake::TestTask.new do |t| + t.libs << "test" + t.test_files = FileList['tests/test*.rb'] + t.verbose = true +end diff --git a/cluster.rb b/cluster.rb deleted file mode 100644 index 2f870aa..0000000 --- a/cluster.rb +++ /dev/null @@ -1,278 +0,0 @@ -# Copyright (C) 2013 Salvatore Sanfilippo <antirez@gmail.com> -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -require 'rubygems' -require 'redis' -require './crc16' - -class RedisCluster - - RedisClusterHashSlots = 16384 - RedisClusterRequestTTL = 16 - RedisClusterDefaultTimeout = 1 - - def initialize(startup_nodes,connections,opt={}) - @startup_nodes = startup_nodes - @max_connections = connections - @connections = {} - @opt = opt - @refresh_table_asap = false - initialize_slots_cache - end - - def get_redis_link(host,port) - timeout = @opt[:timeout] or RedisClusterDefaultTimeout - Redis.new(:host => host, :port => port, :timeout => timeout) - end - - # Given a node (that is just a Ruby hash) give it a name just - # concatenating the host and port. We use the node name as a key - # to cache connections to that node. - def set_node_name!(n) - if !n[:name] - n[:name] = "#{n[:host]}:#{n[:port]}" - end - end - - # Contact the startup nodes and try to fetch the hash slots -> instances - # map in order to initialize the @slots hash. - def initialize_slots_cache - @startup_nodes.each{|n| - begin - @slots = {} - @nodes = [] - - r = get_redis_link(n[:host],n[:port]) - r.cluster("slots").each {|r| - (r[0]..r[1]).each{|slot| - ip,port = r[2] - name = "#{ip}:#{port}" - node = { - :host => ip, :port => port, - :name => name - } - @nodes << node - @slots[slot] = node - } - } - populate_startup_nodes - @refresh_table_asap = false - rescue - # Try with the next node on error. - next - end - # Exit the loop as long as the first node replies - break - } - end - - # Use @nodes to populate @startup_nodes, so that we have more chances - # if a subset of the cluster fails. - def populate_startup_nodes - # Make sure every node has already a name, so that later the - # Array uniq! method will work reliably. - @startup_nodes.each{|n| set_node_name! n} - @nodes.each{|n| @startup_nodes << n} - @startup_nodes.uniq! - end - - # Flush the cache, mostly useful for debugging when we want to force - # redirection. - def flush_slots_cache - @slots = {} - end - - # Return the hash slot from the key. - def keyslot(key) - # Only hash what is inside {...} if there is such a pattern in the key. - # Note that the specification requires the content that is between - # the first { and the first } after the first {. If we found {} without - # nothing in the middle, the whole key is hashed as usually. - s = key.index "{" - if s - e = key.index "}",s+1 - if e && e != s+1 - key = key[s+1..e-1] - end - end - RedisClusterCRC16.crc16(key) % RedisClusterHashSlots - end - - # Return the first key in the command arguments. - # - # Currently we just return argv[1], that is, the first argument - # after the command name. - # - # This is indeed the key for most commands, and when it is not true - # the cluster redirection will point us to the right node anyway. - # - # For commands we want to explicitly bad as they don't make sense - # in the context of cluster, nil is returned. - def get_key_from_command(argv) - case argv[0].to_s.downcase - when "info","multi","exec","slaveof","config","shutdown" - return nil - else - # Unknown commands, and all the commands having the key - # as first argument are handled here: - # set, get, ... - return argv[1] - end - end - - # If the current number of connections is already the maximum number - # allowed, close a random connection. This should be called every time - # we cache a new connection in the @connections hash. - def close_existing_connection - while @connections.length >= @max_connections - @connections.each{|n,r| - @connections.delete(n) - begin - r.client.disconnect - rescue - end - break - } - end - end - - # Return a link to a random node, or raise an error if no node can be - # contacted. This function is only called when we can't reach the node - # associated with a given hash slot, or when we don't know the right - # mapping. - # - # The function will try to get a successful reply to the PING command, - # otherwise the next node is tried. - def get_random_connection - e = "" - @startup_nodes.shuffle.each{|n| - begin - set_node_name!(n) - conn = @connections[n[:name]] - - if !conn - # Connect the node if it is not connected - conn = get_redis_link(n[:host],n[:port]) - if conn.ping == "PONG" - close_existing_connection - @connections[n[:name]] = conn - return conn - else - # If the connection is not good close it ASAP in order - # to avoid waiting for the GC finalizer. File - # descriptors are a rare resource. - conn.client.disconnect - end - else - # The node was already connected, test the connection. - return conn if conn.ping == "PONG" - end - rescue => e - # Just try with the next node. - end - } - raise "Can't reach a single startup node. #{e}" - end - - # Given a slot return the link (Redis instance) to the mapped node. - # Make sure to create a connection with the node if we don't have - # one. - def get_connection_by_slot(slot) - node = @slots[slot] - # If we don't know what the mapping is, return a random node. - return get_random_connection if !node - set_node_name!(node) - if not @connections[node[:name]] - begin - close_existing_connection - @connections[node[:name]] = - get_redis_link(node[:host],node[:port]) - rescue - # This will probably never happen with recent redis-rb - # versions because the connection is enstablished in a lazy - # way only when a command is called. However it is wise to - # handle an instance creation error of some kind. - return get_random_connection - end - end - @connections[node[:name]] - end - - # Dispatch commands. - def send_cluster_command(argv) - initialize_slots_cache if @refresh_table_asap - ttl = RedisClusterRequestTTL; # Max number of redirections - e = "" - asking = false - try_random_node = false - while ttl > 0 - ttl -= 1 - key = get_key_from_command(argv) - raise "No way to dispatch this command to Redis Cluster." if !key - slot = keyslot(key) - if try_random_node - r = get_random_connection - try_random_node = false - else - r = get_connection_by_slot(slot) - end - begin - # TODO: use pipelining to send asking and save a rtt. - r.asking if asking - asking = false - return r.send(argv[0].to_sym,*argv[1..-1]) - rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES - try_random_node = true - sleep(0.1) if ttl < RedisClusterRequestTTL/2 - rescue => e - errv = e.to_s.split - if errv[0] == "MOVED" || errv[0] == "ASK" - if errv[0] == "ASK" - asking = true - else - # Serve replied with MOVED. It's better for us to - # ask for CLUSTER NODES the next time. - @refresh_table_asap = true - end - newslot = errv[1].to_i - node_ip,node_port = errv[2].split(":") - if !asking - @slots[newslot] = {:host => node_ip, - :port => node_port.to_i} - end - else - raise e - end - end - end - raise "Too many Cluster redirections? (last error: #{e})" - end - - # Currently we handle all the commands using method_missing for - # simplicity. For a Cluster client actually it will be better to have - # every single command as a method with the right arity and possibly - # additional checks (example: RPOPLPUSH with same src/dst key, SORT - # without GET or BY, and so forth). - def method_missing(*argv) - send_cluster_command(argv) - end -end - diff --git a/consistency-test.rb b/consistency-test.rb index 2f366db..6472412 100644 --- a/consistency-test.rb +++ b/consistency-test.rb @@ -16,10 +16,10 @@ # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: -# +# # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND @@ -28,7 +28,7 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -require './cluster' +require './rediscluster' class ConsistencyTester def initialize(redis) diff --git a/dev/Dockerfile b/dev/Dockerfile new file mode 100644 index 0000000..f907f35 --- /dev/null +++ b/dev/Dockerfile @@ -0,0 +1,53 @@ +FROM redis:latest + +RUN apt-get update +RUN apt-get install -y ruby +RUN apt-get install -y supervisor + +WORKDIR / + +# install openssh-server +RUN apt-get update +RUN apt-get install -y openssh-server +RUN mkdir /var/run/sshd +RUN echo 'root:root' | chpasswd +RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config +RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd +ENV NOTVISIBLE "in users profile" +RUN echo "export VISIBLE=now" >> /etc/profile + +RUN echo 'daemonize yes\npidfile 7000.pid\ncluster-config-file /7000/node.conf\nport 7000\ncluster-enabled yes\ncluster-node-timeout 5000\nappendonly yes\nappendonly no\ndbfilename dump.rdb\ndir /7000' > conf.tmpl + +RUN mkdir 7000 7001 7002 7003 7004 7005 + +RUN cp conf.tmpl 7000/redis.conf + +RUN cp conf.tmpl 7001/redis.conf +RUN sed -i 's/7000/7001/' 7001/redis.conf + +RUN cp conf.tmpl 7002/redis.conf +RUN sed -i 's/7000/7002/' 7002/redis.conf + +RUN cp conf.tmpl 7003/redis.conf +RUN sed -i 's/7000/7003/' 7003/redis.conf + +RUN cp conf.tmpl 7004/redis.conf +RUN sed -i 's/7000/7004/' 7004/redis.conf + +RUN cp conf.tmpl 7005/redis.conf +RUN sed -i 's/7000/7005/' 7005/redis.conf + +RUN gem install redis + +COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf +COPY redis-trib.rb /redis-trib.rb +COPY create_cluster.sh /create_cluster.sh + +EXPOSE 7002 +EXPOSE 7003 +EXPOSE 7004 +EXPOSE 7005 +EXPOSE 7006 +EXPOSE 7007 + +CMD ["/usr/bin/supervisord"] diff --git a/dev/create_cluster.sh b/dev/create_cluster.sh new file mode 100755 index 0000000..bc03b27 --- /dev/null +++ b/dev/create_cluster.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +redis=$(hostname -I) +redis="$(echo "${redis}" | tr -d '[[:space:]]')" +echo "yes" | ./redis-trib.rb create --replicas 1 $redis:7000 $redis:7001 $redis:7002 $redis:7003 $redis:7004 $redis:7005 diff --git a/dev/redis-trib.rb b/dev/redis-trib.rb new file mode 100755 index 0000000..068e60d --- /dev/null +++ b/dev/redis-trib.rb @@ -0,0 +1,1388 @@ +#!/usr/bin/env ruby + +# TODO (temporary here, we'll move this into the Github issues once +# redis-trib initial implementation is completed). +# +# - Make sure that if the rehashing fails in the middle redis-trib will try +# to recover. +# - When redis-trib performs a cluster check, if it detects a slot move in +# progress it should prompt the user to continue the move from where it +# stopped. +# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop +# while rehashing, and performing the best cleanup possible if the user +# forces the quit. +# - When doing "fix" set a global Fix to true, and prompt the user to +# fix the problem if automatically fixable every time there is something +# to fix. For instance: +# 1) If there is a node that pretend to receive a slot, or to migrate a +# slot, but has no entries in that slot, fix it. +# 2) If there is a node having keys in slots that are not owned by it +# fix this condition moving the entries in the same node. +# 3) Perform more possibly slow tests about the state of the cluster. +# 4) When aborted slot migration is detected, fix it. + +require 'rubygems' +require 'redis' + +ClusterHashSlots = 16384 + +def xputs(s) + case s[0..2] + when ">>>" + color="29;1" + when "[ER" + color="31;1" + when "[OK" + color="32" + when "[FA","***" + color="33" + else + color=nil + end + + color = nil if ENV['TERM'] != "xterm" + print "\033[#{color}m" if color + print s + print "\033[0m" if color + print "\n" +end + +class ClusterNode + def initialize(addr) + s = addr.split(":") + if s.length < 2 + puts "Invalid IP or Port (given as #{addr}) - use IP:Port format" + exit 1 + end + port = s.pop # removes port from split array + ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address + @r = nil + @info = {} + @info[:host] = ip + @info[:port] = port + @info[:slots] = {} + @info[:migrating] = {} + @info[:importing] = {} + @info[:replicate] = false + @dirty = false # True if we need to flush slots info into node. + @friends = [] + end + + def friends + @friends + end + + def slots + @info[:slots] + end + + def has_flag?(flag) + @info[:flags].index(flag) + end + + def to_s + "#{@info[:host]}:#{@info[:port]}" + end + + def connect(o={}) + return if @r + print "Connecting to node #{self}: " + STDOUT.flush + begin + @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60) + @r.ping + rescue + xputs "[ERR] Sorry, can't connect to node #{self}" + exit 1 if o[:abort] + @r = nil + end + xputs "OK" + end + + def assert_cluster + info = @r.info + if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0 + xputs "[ERR] Node #{self} is not configured as a cluster node." + exit 1 + end + end + + def assert_empty + if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) || + (@r.info['db0']) + xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0." + exit 1 + end + end + + def load_info(o={}) + self.connect + nodes = @r.cluster("nodes").split("\n") + nodes.each{|n| + # name addr flags role ping_sent ping_recv link_status slots + split = n.split + name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6] + slots = split[8..-1] + info = { + :name => name, + :addr => addr, + :flags => flags.split(","), + :replicate => master_id, + :ping_sent => ping_sent.to_i, + :ping_recv => ping_recv.to_i, + :link_status => link_status + } + info[:replicate] = false if master_id == "-" + + if info[:flags].index("myself") + @info = @info.merge(info) + @info[:slots] = {} + slots.each{|s| + if s[0..0] == '[' + if s.index("->-") # Migrating + slot,dst = s[1..-1].split("->-") + @info[:migrating][slot.to_i] = dst + elsif s.index("-<-") # Importing + slot,src = s[1..-1].split("-<-") + @info[:importing][slot.to_i] = src + end + elsif s.index("-") + start,stop = s.split("-") + self.add_slots((start.to_i)..(stop.to_i)) + else + self.add_slots((s.to_i)..(s.to_i)) + end + } if slots + @dirty = false + @r.cluster("info").split("\n").each{|e| + k,v=e.split(":") + k = k.to_sym + v.chop! + if k != :cluster_state + @info[k] = v.to_i + else + @info[k] = v + end + } + elsif o[:getfriends] + @friends << info + end + } + end + + def add_slots(slots) + slots.each{|s| + @info[:slots][s] = :new + } + @dirty = true + end + + def set_as_replica(node_id) + @info[:replicate] = node_id + @dirty = true + end + + def flush_node_config + return if !@dirty + if @info[:replicate] + begin + @r.cluster("replicate",@info[:replicate]) + rescue + # If the cluster did not already joined it is possible that + # the slave does not know the master node yet. So on errors + # we return ASAP leaving the dirty flag set, to flush the + # config later. + return + end + else + new = [] + @info[:slots].each{|s,val| + if val == :new + new << s + @info[:slots][s] = true + end + } + @r.cluster("addslots",*new) + end + @dirty = false + end + + def info_string + # We want to display the hash slots assigned to this node + # as ranges, like in: "1-5,8-9,20-25,30" + # + # Note: this could be easily written without side effects, + # we use 'slots' just to split the computation into steps. + + # First step: we want an increasing array of integers + # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30] + slots = @info[:slots].keys.sort + + # As we want to aggregate adjacent slots we convert all the + # slot integers into ranges (with just one element) + # So we have something like [1..1,2..2, ... and so forth. + slots.map!{|x| x..x} + + # Finally we group ranges with adjacent elements. + slots = slots.reduce([]) {|a,b| + if !a.empty? && b.first == (a[-1].last)+1 + a[0..-2] + [(a[-1].first)..(b.last)] + else + a + [b] + end + } + + # Now our task is easy, we just convert ranges with just one + # element into a number, and a real range into a start-end format. + # Finally we join the array using the comma as separator. + slots = slots.map{|x| + x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}" + }.join(",") + + role = self.has_flag?("master") ? "M" : "S" + + if self.info[:replicate] and @dirty + is = "S: #{self.info[:name]} #{self.to_s}" + else + is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+ + " slots:#{slots} (#{self.slots.length} slots) "+ + "#{(self.info[:flags]-["myself"]).join(",")}" + end + if self.info[:replicate] + is += "\n replicates #{info[:replicate]}" + elsif self.has_flag?("master") && self.info[:replicas] + is += "\n #{info[:replicas].length} additional replica(s)" + end + is + end + + # Return a single string representing nodes and associated slots. + # TODO: remove slaves from config when slaves will be handled + # by Redis Cluster. + def get_config_signature + config = [] + @r.cluster("nodes").each_line{|l| + s = l.split + slots = s[8..-1].select {|x| x[0..0] != "["} + next if slots.length == 0 + config << s[0]+":"+(slots.sort.join(",")) + } + config.sort.join("|") + end + + def info + @info + end + + def is_dirty? + @dirty + end + + def r + @r + end +end + +class RedisTrib + def initialize + @nodes = [] + @fix = false + @errors = [] + end + + def check_arity(req_args, num_args) + if ((req_args > 0 and num_args != req_args) || + (req_args < 0 and num_args < req_args.abs)) + xputs "[ERR] Wrong number of arguments for specified sub command" + exit 1 + end + end + + def add_node(node) + @nodes << node + end + + def cluster_error(msg) + @errors << msg + xputs msg + end + + def get_node_by_name(name) + @nodes.each{|n| + return n if n.info[:name] == name.downcase + } + return nil + end + + # This function returns the master that has the least number of replicas + # in the cluster. If there are multiple masters with the same smaller + # number of replicas, one at random is returned. + def get_master_with_least_replicas + masters = @nodes.select{|n| n.has_flag? "master"} + sorted = masters.sort{|a,b| + a.info[:replicas].length <=> b.info[:replicas].length + } + sorted[0] + end + + def check_cluster + xputs ">>> Performing Cluster Check (using node #{@nodes[0]})" + show_nodes + check_config_consistency + check_open_slots + check_slots_coverage + end + + # Merge slots of every known node. If the resulting slots are equal + # to ClusterHashSlots, then all slots are served. + def covered_slots + slots = {} + @nodes.each{|n| + slots = slots.merge(n.slots) + } + slots + end + + def check_slots_coverage + xputs ">>> Check slots coverage..." + slots = covered_slots + if slots.length == ClusterHashSlots + xputs "[OK] All #{ClusterHashSlots} slots covered." + else + cluster_error \ + "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes." + fix_slots_coverage if @fix + end + end + + def check_open_slots + xputs ">>> Check for open slots..." + open_slots = [] + @nodes.each{|n| + if n.info[:migrating].size > 0 + cluster_error \ + "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})." + open_slots += n.info[:migrating].keys + elsif n.info[:importing].size > 0 + cluster_error \ + "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})." + open_slots += n.info[:importing].keys + end + } + open_slots.uniq! + if open_slots.length > 0 + xputs "[WARNING] The following slots are open: #{open_slots.join(",")}" + end + if @fix + open_slots.each{|slot| fix_open_slot slot} + end + end + + def nodes_with_keys_in_slot(slot) + nodes = [] + @nodes.each{|n| + nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0 + } + nodes + end + + def fix_slots_coverage + not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys + xputs ">>> Fixing slots coverage..." + xputs "List of not covered slots: " + not_covered.join(",") + + # For every slot, take action depending on the actual condition: + # 1) No node has keys for this slot. + # 2) A single node has keys for this slot. + # 3) Multiple nodes have keys for this slot. + slots = {} + not_covered.each{|slot| + nodes = nodes_with_keys_in_slot(slot) + slots[slot] = nodes + xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join}" + } + + none = slots.select {|k,v| v.length == 0} + single = slots.select {|k,v| v.length == 1} + multi = slots.select {|k,v| v.length > 1} + + # Handle case "1": keys in no node. + if none.length > 0 + xputs "The folowing uncovered slots have no keys across the cluster:" + xputs none.keys.join(",") + yes_or_die "Fix these slots by covering with a random node?" + none.each{|slot,nodes| + node = @nodes.sample + xputs ">>> Covering slot #{slot} with #{node}" + node.r.cluster("addslots",slot) + } + end + + # Handle case "2": keys only in one node. + if single.length > 0 + xputs "The folowing uncovered slots have keys in just one node:" + puts single.keys.join(",") + yes_or_die "Fix these slots by covering with those nodes?" + single.each{|slot,nodes| + xputs ">>> Covering slot #{slot} with #{nodes[0]}" + nodes[0].r.cluster("addslots",slot) + } + end + + # Handle case "3": keys in multiple nodes. + if multi.length > 0 + xputs "The folowing uncovered slots have keys in multiple nodes:" + xputs multi.keys.join(",") + yes_or_die "Fix these slots by moving keys into a single node?" + multi.each{|slot,nodes| + xputs ">>> Covering slot #{slot} moving keys to #{nodes[0]}" + # TODO + # 1) Set all nodes as "MIGRATING" for this slot, so that we + # can access keys in the hash slot using ASKING. + # 2) Move everything to node[0] + # 3) Clear MIGRATING from nodes, and ADDSLOTS the slot to + # node[0]. + raise "TODO: Work in progress" + } + end + end + + # Return the owner of the specified slot + def get_slot_owner(slot) + @nodes.each{|n| + n.slots.each{|s,_| + return n if s == slot + } + } + nil + end + + # Slot 'slot' was found to be in importing or migrating state in one or + # more nodes. This function fixes this condition by migrating keys where + # it seems more sensible. + def fix_open_slot(slot) + puts ">>> Fixing open slot #{slot}" + + # Try to obtain the current slot owner, according to the current + # nodes configuration. + owner = get_slot_owner(slot) + + # If there is no slot owner, set as owner the slot with the biggest + # number of keys, among the set of migrating / importing nodes. + if !owner + xputs "*** Fix me, some work to do here." + # Select owner... + # Use ADDSLOTS to assign the slot. + exit 1 + end + + migrating = [] + importing = [] + @nodes.each{|n| + next if n.has_flag? "slave" + if n.info[:migrating][slot] + migrating << n + elsif n.info[:importing][slot] + importing << n + elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner + xputs "*** Found keys about slot #{slot} in node #{n}!" + importing << n + end + } + puts "Set as migrating in: #{migrating.join(",")}" + puts "Set as importing in: #{importing.join(",")}" + + # Case 1: The slot is in migrating state in one slot, and in + # importing state in 1 slot. That's trivial to address. + if migrating.length == 1 && importing.length == 1 + move_slot(migrating[0],importing[0],slot,:verbose=>true,:fix=>true) + # Case 2: There are multiple nodes that claim the slot as importing, + # they probably got keys about the slot after a restart so opened + # the slot. In this case we just move all the keys to the owner + # according to the configuration. + elsif migrating.length == 0 && importing.length > 0 + xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}" + importing.each {|node| + next if node == owner + move_slot(node,owner,slot,:verbose=>true,:fix=>true,:cold=>true) + xputs ">>> Setting #{slot} as STABLE in #{node}" + node.r.cluster("setslot",slot,"stable") + } + # Case 3: There are no slots claiming to be in importing state, but + # there is a migrating node that actually don't have any key. We + # can just close the slot, probably a reshard interrupted in the middle. + elsif importing.length == 0 && migrating.length == 1 && + migrating[0].r.cluster("getkeysinslot",slot,10).length == 0 + migrating[0].r.cluster("setslot",slot,"stable") + else + xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}" + end + end + + # Check if all the nodes agree about the cluster configuration + def check_config_consistency + if !is_config_consistent? + cluster_error "[ERR] Nodes don't agree about configuration!" + else + xputs "[OK] All nodes agree about slots configuration." + end + end + + def is_config_consistent? + signatures=[] + @nodes.each{|n| + signatures << n.get_config_signature + } + return signatures.uniq.length == 1 + end + + def wait_cluster_join + print "Waiting for the cluster to join" + while !is_config_consistent? + print "." + STDOUT.flush + sleep 1 + end + print "\n" + end + + def alloc_slots + nodes_count = @nodes.length + masters_count = @nodes.length / (@replicas+1) + masters = [] + + # The first step is to split instances by IP. This is useful as + # we'll try to allocate master nodes in different physical machines + # (as much as possible) and to allocate slaves of a given master in + # different physical machines as well. + # + # This code assumes just that if the IP is different, than it is more + # likely that the instance is running in a different physical host + # or at least a different virtual machine. + ips = {} + @nodes.each{|n| + ips[n.info[:host]] = [] if !ips[n.info[:host]] + ips[n.info[:host]] << n + } + + # Select master instances + puts "Using #{masters_count} masters:" + interleaved = [] + stop = false + while not stop do + # Take one node from each IP until we run out of nodes + # across every IP. + ips.each do |ip,nodes| + if nodes.empty? + # if this IP has no remaining nodes, check for termination + if interleaved.length == nodes_count + # stop when 'interleaved' has accumulated all nodes + stop = true + next + end + else + # else, move one node from this IP to 'interleaved' + interleaved.push nodes.shift + end + end + end + + masters = interleaved.slice!(0, masters_count) + nodes_count -= masters.length + + masters.each{|m| puts m} + + # Alloc slots on masters + slots_per_node = ClusterHashSlots.to_f / masters_count + first = 0 + cursor = 0.0 + masters.each_with_index{|n,masternum| + last = (cursor+slots_per_node-1).round + if last > ClusterHashSlots || masternum == masters.length-1 + last = ClusterHashSlots-1 + end + last = first if last < first # Min step is 1. + n.add_slots first..last + first = last+1 + cursor += slots_per_node + } + + # Select N replicas for every master. + # We try to split the replicas among all the IPs with spare nodes + # trying to avoid the host where the master is running, if possible. + # + # Note we loop two times. The first loop assigns the requested + # number of replicas to each master. The second loop assigns any + # remaining instances as extra replicas to masters. Some masters + # may end up with more than their requested number of replicas, but + # all nodes will be used. + assignment_verbose = false + + [:requested,:unused].each do |assign| + masters.each do |m| + assigned_replicas = 0 + while assigned_replicas < @replicas + break if nodes_count == 0 + if assignment_verbose + if assign == :requested + puts "Requesting total of #{@replicas} replicas " \ + "(#{assigned_replicas} replicas assigned " \ + "so far with #{nodes_count} total remaining)." + elsif assign == :unused + puts "Assigning extra instance to replication " \ + "role too (#{nodes_count} remaining)." + end + end + + # Return the first node not matching our current master + node = interleaved.find{|n| n.info[:host] != m.info[:host]} + + # If we found a node, use it as a best-first match. + # Otherwise, we didn't find a node on a different IP, so we + # go ahead and use a same-IP replica. + if node + slave = node + interleaved.delete node + else + slave = interleaved.shift + end + slave.set_as_replica(m.info[:name]) + nodes_count -= 1 + assigned_replicas += 1 + puts "Adding replica #{slave} to #{m}" + + # If we are in the "assign extra nodes" loop, + # we want to assign one extra replica to each + # master before repeating masters. + # This break lets us assign extra replicas to masters + # in a round-robin way. + break if assign == :unused + end + end + end + end + + def flush_nodes_config + @nodes.each{|n| + n.flush_node_config + } + end + + def show_nodes + @nodes.each{|n| + xputs n.info_string + } + end + + # Redis Cluster config epoch collision resolution code is able to eventually + # set a different epoch to each node after a new cluster is created, but + # it is slow compared to assign a progressive config epoch to each node + # before joining the cluster. However we do just a best-effort try here + # since if we fail is not a problem. + def assign_config_epoch + config_epoch = 1 + @nodes.each{|n| + begin + n.r.cluster("set-config-epoch",config_epoch) + rescue + end + config_epoch += 1 + } + end + + def join_cluster + # We use a brute force approach to make sure the node will meet + # each other, that is, sending CLUSTER MEET messages to all the nodes + # about the very same node. + # Thanks to gossip this information should propagate across all the + # cluster in a matter of seconds. + first = false + @nodes.each{|n| + if !first then first = n.info; next; end # Skip the first node + n.r.cluster("meet",first[:host],first[:port]) + } + end + + def yes_or_die(msg) + print "#{msg} (type 'yes' to accept): " + STDOUT.flush + if !(STDIN.gets.chomp.downcase == "yes") + xputs "*** Aborting..." + exit 1 + end + end + + def load_cluster_info_from_node(nodeaddr) + node = ClusterNode.new(nodeaddr) + node.connect(:abort => true) + node.assert_cluster + node.load_info(:getfriends => true) + add_node(node) + node.friends.each{|f| + next if f[:flags].index("noaddr") || + f[:flags].index("disconnected") || + f[:flags].index("fail") + fnode = ClusterNode.new(f[:addr]) + fnode.connect() + next if !fnode.r + begin + fnode.load_info() + add_node(fnode) + rescue => e + xputs "[ERR] Unable to load info for node #{fnode}" + end + } + populate_nodes_replicas_info + end + + # This function is called by load_cluster_info_from_node in order to + # add additional information to every node as a list of replicas. + def populate_nodes_replicas_info + # Start adding the new field to every node. + @nodes.each{|n| + n.info[:replicas] = [] + } + + # Populate the replicas field using the replicate field of slave + # nodes. + @nodes.each{|n| + if n.info[:replicate] + master = get_node_by_name(n.info[:replicate]) + if !master + xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}." + else + master.info[:replicas] << n + end + end + } + end + + # Given a list of source nodes return a "resharding plan" + # with what slots to move in order to move "numslots" slots to another + # instance. + def compute_reshard_table(sources,numslots) + moved = [] + # Sort from bigger to smaller instance, for two reasons: + # 1) If we take less slots than instances it is better to start + # getting from the biggest instances. + # 2) We take one slot more from the first instance in the case of not + # perfect divisibility. Like we have 3 nodes and need to get 10 + # slots, we take 4 from the first, and 3 from the rest. So the + # biggest is always the first. + sources = sources.sort{|a,b| b.slots.length <=> a.slots.length} + source_tot_slots = sources.inject(0) {|sum,source| + sum+source.slots.length + } + sources.each_with_index{|s,i| + # Every node will provide a number of slots proportional to the + # slots it has assigned. + n = (numslots.to_f/source_tot_slots*s.slots.length) + if i == 0 + n = n.ceil + else + n = n.floor + end + s.slots.keys.sort[(0...n)].each{|slot| + if moved.length < numslots + moved << {:source => s, :slot => slot} + end + } + } + return moved + end + + def show_reshard_table(table) + table.each{|e| + puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}" + } + end + + # Move slots between source and target nodes using MIGRATE. + # + # Options: + # :verbose -- Print a dot for every moved key. + # :fix -- We are moving in the context of a fix. Use REPLACE. + # :cold -- Move keys without opening / reconfiguring the nodes. + def move_slot(source,target,slot,o={}) + # We start marking the slot as importing in the destination node, + # and the slot as migrating in the target host. Note that the order of + # the operations is important, as otherwise a client may be redirected + # to the target node that does not yet know it is importing this slot. + print "Moving slot #{slot} from #{source} to #{target}: "; STDOUT.flush + if !o[:cold] + target.r.cluster("setslot",slot,"importing",source.info[:name]) + source.r.cluster("setslot",slot,"migrating",target.info[:name]) + end + # Migrate all the keys from source to target using the MIGRATE command + while true + keys = source.r.cluster("getkeysinslot",slot,10) + break if keys.length == 0 + keys.each{|key| + begin + source.r.client.call(["migrate",target.info[:host],target.info[:port],key,0,15000]) + rescue => e + if o[:fix] && e.to_s =~ /BUSYKEY/ + xputs "*** Target key #{key} exists. Replacing it for FIX." + source.r.client.call(["migrate",target.info[:host],target.info[:port],key,0,15000,:replace]) + else + puts "" + xputs "[ERR] #{e}" + exit 1 + end + end + print "." if o[:verbose] + STDOUT.flush + } + end + + puts + # Set the new node as the owner of the slot in all the known nodes. + if !o[:cold] + @nodes.each{|n| + n.r.cluster("setslot",slot,"node",target.info[:name]) + } + end + end + + # redis-trib subcommands implementations + + def check_cluster_cmd(argv,opt) + load_cluster_info_from_node(argv[0]) + check_cluster + end + + def fix_cluster_cmd(argv,opt) + @fix = true + load_cluster_info_from_node(argv[0]) + check_cluster + end + + def reshard_cluster_cmd(argv,opt) + load_cluster_info_from_node(argv[0]) + check_cluster + if @errors.length != 0 + puts "*** Please fix your cluster problems before resharding" + exit 1 + end + + # Get number of slots + if opt['slots'] + numslots = opt['slots'].to_i + else + numslots = 0 + while numslots <= 0 or numslots > ClusterHashSlots + print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? " + numslots = STDIN.gets.to_i + end + end + + # Get the target instance + if opt['to'] + target = get_node_by_name(opt['to']) + if !target || target.has_flag?("slave") + xputs "*** The specified node is not known or not a master, please retry." + exit 1 + end + else + target = nil + while not target + print "What is the receiving node ID? " + target = get_node_by_name(STDIN.gets.chop) + if !target || target.has_flag?("slave") + xputs "*** The specified node is not known or not a master, please retry." + target = nil + end + end + end + + # Get the source instances + sources = [] + if opt['from'] + opt['from'].split(',').each{|node_id| + if node_id == "all" + sources = "all" + break + end + src = get_node_by_name(node_id) + if !src || src.has_flag?("slave") + xputs "*** The specified node is not known or is not a master, please retry." + exit 1 + end + sources << src + } + else + xputs "Please enter all the source node IDs." + xputs " Type 'all' to use all the nodes as source nodes for the hash slots." + xputs " Type 'done' once you entered all the source nodes IDs." + while true + print "Source node ##{sources.length+1}:" + line = STDIN.gets.chop + src = get_node_by_name(line) + if line == "done" + break + elsif line == "all" + sources = "all" + break + elsif !src || src.has_flag?("slave") + xputs "*** The specified node is not known or is not a master, please retry." + elsif src.info[:name] == target.info[:name] + xputs "*** It is not possible to use the target node as source node." + else + sources << src + end + end + end + + if sources.length == 0 + puts "*** No source nodes given, operation aborted" + exit 1 + end + + # Handle soures == all. + if sources == "all" + sources = [] + @nodes.each{|n| + next if n.info[:name] == target.info[:name] + next if n.has_flag?("slave") + sources << n + } + end + + # Check if the destination node is the same of any source nodes. + if sources.index(target) + xputs "*** Target node is also listed among the source nodes!" + exit 1 + end + + puts "\nReady to move #{numslots} slots." + puts " Source nodes:" + sources.each{|s| puts " "+s.info_string} + puts " Destination node:" + puts " #{target.info_string}" + reshard_table = compute_reshard_table(sources,numslots) + puts " Resharding plan:" + show_reshard_table(reshard_table) + if !opt['yes'] + print "Do you want to proceed with the proposed reshard plan (yes/no)? " + yesno = STDIN.gets.chop + exit(1) if (yesno != "yes") + end + reshard_table.each{|e| + move_slot(e[:source],target,e[:slot],:verbose=>true) + } + end + + # This is an helper function for create_cluster_cmd that verifies if + # the number of nodes and the specified replicas have a valid configuration + # where there are at least three master nodes and enough replicas per node. + def check_create_parameters + masters = @nodes.length/(@replicas+1) + if masters < 3 + puts "*** ERROR: Invalid configuration for cluster creation." + puts "*** Redis Cluster requires at least 3 master nodes." + puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node." + puts "*** At least #{3*(@replicas+1)} nodes are required." + exit 1 + end + end + + def create_cluster_cmd(argv,opt) + opt = {'replicas' => 0}.merge(opt) + @replicas = opt['replicas'].to_i + + xputs ">>> Creating cluster" + argv[0..-1].each{|n| + node = ClusterNode.new(n) + node.connect(:abort => true) + node.assert_cluster + node.load_info + node.assert_empty + add_node(node) + } + check_create_parameters + xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..." + alloc_slots + show_nodes + yes_or_die "Can I set the above configuration?" + flush_nodes_config + xputs ">>> Nodes configuration updated" + xputs ">>> Assign a different config epoch to each node" + assign_config_epoch + xputs ">>> Sending CLUSTER MEET messages to join the cluster" + join_cluster + # Give one second for the join to start, in order to avoid that + # wait_cluster_join will find all the nodes agree about the config as + # they are still empty with unassigned slots. + sleep 1 + wait_cluster_join + flush_nodes_config # Useful for the replicas + check_cluster + end + + def addnode_cluster_cmd(argv,opt) + xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}" + + # Check the existing cluster + load_cluster_info_from_node(argv[1]) + check_cluster + + # If --master-id was specified, try to resolve it now so that we + # abort before starting with the node configuration. + if opt['slave'] + if opt['master-id'] + master = get_node_by_name(opt['master-id']) + if !master + xputs "[ERR] No such master ID #{opt['master-id']}" + end + else + master = get_master_with_least_replicas + xputs "Automatically selected master #{master}" + end + end + + # Add the new node + new = ClusterNode.new(argv[0]) + new.connect(:abort => true) + new.assert_cluster + new.load_info + new.assert_empty + first = @nodes.first.info + add_node(new) + + # Send CLUSTER MEET command to the new node + xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster." + new.r.cluster("meet",first[:host],first[:port]) + + # Additional configuration is needed if the node is added as + # a slave. + if opt['slave'] + wait_cluster_join + xputs ">>> Configure node as replica of #{master}." + new.r.cluster("replicate",master.info[:name]) + end + xputs "[OK] New node added correctly." + end + + def delnode_cluster_cmd(argv,opt) + id = argv[1].downcase + xputs ">>> Removing node #{id} from cluster #{argv[0]}" + + # Load cluster information + load_cluster_info_from_node(argv[0]) + + # Check if the node exists and is not empty + node = get_node_by_name(id) + + if !node + xputs "[ERR] No such node ID #{id}" + exit 1 + end + + if node.slots.length != 0 + xputs "[ERR] Node #{node} is not empty! Reshard data away and try again." + exit 1 + end + + # Send CLUSTER FORGET to all the nodes but the node to remove + xputs ">>> Sending CLUSTER FORGET messages to the cluster..." + @nodes.each{|n| + next if n == node + if n.info[:replicate] && n.info[:replicate].downcase == id + # Reconfigure the slave to replicate with some other node + master = get_master_with_least_replicas + xputs ">>> #{n} as replica of #{master}" + n.r.cluster("replicate",master.info[:name]) + end + n.r.cluster("forget",argv[1]) + } + + # Finally shutdown the node + xputs ">>> SHUTDOWN the node." + node.r.shutdown + end + + def set_timeout_cluster_cmd(argv,opt) + timeout = argv[1].to_i + if timeout < 100 + puts "Setting a node timeout of less than 100 milliseconds is a bad idea." + exit 1 + end + + # Load cluster information + load_cluster_info_from_node(argv[0]) + ok_count = 0 + err_count = 0 + + # Send CLUSTER FORGET to all the nodes but the node to remove + xputs ">>> Reconfiguring node timeout in every cluster node..." + @nodes.each{|n| + begin + n.r.config("set","cluster-node-timeout",timeout) + n.r.config("rewrite") + ok_count += 1 + xputs "*** New timeout set for #{n}" + rescue => e + puts "ERR setting node-timeot for #{n}: #{e}" + err_count += 1 + end + } + xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR." + end + + def call_cluster_cmd(argv,opt) + cmd = argv[1..-1] + cmd[0] = cmd[0].upcase + + # Load cluster information + load_cluster_info_from_node(argv[0]) + xputs ">>> Calling #{cmd.join(" ")}" + @nodes.each{|n| + begin + res = n.r.send(*cmd) + puts "#{n}: #{res}" + rescue => e + puts "#{n}: #{e}" + end + } + end + + def import_cluster_cmd(argv,opt) + source_addr = opt['from'] + xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}" + use_copy = opt['copy'] + use_replace = opt['replace'] + + # Check the existing cluster. + load_cluster_info_from_node(argv[0]) + check_cluster + + # Connect to the source node. + xputs ">>> Connecting to the source Redis instance" + src_host,src_port = source_addr.split(":") + source = Redis.new(:host =>src_host, :port =>src_port) + if source.info['cluster_enabled'].to_i == 1 + xputs "[ERR] The source node should not be a cluster node." + end + xputs "*** Importing #{source.dbsize} keys from DB 0" + + # Build a slot -> node map + slots = {} + @nodes.each{|n| + n.slots.each{|s,_| + slots[s] = n + } + } + + # Use SCAN to iterate over the keys, migrating to the + # right node as needed. + cursor = nil + while cursor != 0 + cursor,keys = source.scan(cursor, :count => 1000) + cursor = cursor.to_i + keys.each{|k| + # Migrate keys using the MIGRATE command. + slot = key_to_slot(k) + target = slots[slot] + print "Migrating #{k} to #{target}: " + STDOUT.flush + begin + cmd = ["migrate",target.info[:host],target.info[:port],k,0,15000] + cmd << :copy if use_copy + cmd << :replace if use_replace + source.client.call(cmd) + rescue => e + puts e + else + puts "OK" + end + } + end + end + + def help_cluster_cmd(argv,opt) + show_help + exit 0 + end + + # Parse the options for the specific command "cmd". + # Returns an hash populate with option => value pairs, and the index of + # the first non-option argument in ARGV. + def parse_options(cmd) + idx = 1 ; # Current index into ARGV + options={} + while idx < ARGV.length && ARGV[idx][0..1] == '--' + if ARGV[idx][0..1] == "--" + option = ARGV[idx][2..-1] + idx += 1 + if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil + puts "Unknown option '#{option}' for command '#{cmd}'" + exit 1 + end + if ALLOWED_OPTIONS[cmd][option] + value = ARGV[idx] + idx += 1 + else + value = true + end + options[option] = value + else + # Remaining arguments are not options. + break + end + end + + # Enforce mandatory options + if ALLOWED_OPTIONS[cmd] + ALLOWED_OPTIONS[cmd].each {|option,val| + if !options[option] && val == :required + puts "Option '--#{option}' is required "+ \ + "for subcommand '#{cmd}'" + exit 1 + end + } + end + return options,idx + end +end + +################################################################################# +# Libraries +# +# We try to don't depend on external libs since this is a critical part +# of Redis Cluster. +################################################################################# + +# This is the CRC16 algorithm used by Redis Cluster to hash keys. +# Implementation according to CCITT standards. +# +# This is actually the XMODEM CRC 16 algorithm, using the +# following parameters: +# +# Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN" +# Width : 16 bit +# Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) +# Initialization : 0000 +# Reflect Input byte : False +# Reflect Output CRC : False +# Xor constant to output CRC : 0000 +# Output for "123456789" : 31C3 + +module RedisClusterCRC16 + def RedisClusterCRC16.crc16(bytes) + crc = 0 + bytes.each_byte{|b| + crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff] + } + crc + end + +private + XMODEMCRC16Lookup = [ + 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, + 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, + 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, + 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, + 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, + 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, + 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, + 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, + 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, + 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, + 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, + 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, + 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, + 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, + 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, + 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, + 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, + 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, + 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, + 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, + 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, + 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, + 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, + 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, + 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, + 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, + 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, + 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, + 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, + 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, + 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, + 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 + ] +end + +# Turn a key name into the corrisponding Redis Cluster slot. +def key_to_slot(key) + # Only hash what is inside {...} if there is such a pattern in the key. + # Note that the specification requires the content that is between + # the first { and the first } after the first {. If we found {} without + # nothing in the middle, the whole key is hashed as usually. + s = key.index "{" + if s + e = key.index "}",s+1 + if e && e != s+1 + key = key[s+1..e-1] + end + end + RedisClusterCRC16.crc16(key) % 16384 +end + +################################################################################# +# Definition of commands +################################################################################# + +COMMANDS={ + "create" => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"], + "check" => ["check_cluster_cmd", 2, "host:port"], + "fix" => ["fix_cluster_cmd", 2, "host:port"], + "reshard" => ["reshard_cluster_cmd", 2, "host:port"], + "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"], + "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"], + "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"], + "call" => ["call_cluster_cmd", -3, "host:port command arg arg .. arg"], + "import" => ["import_cluster_cmd", 2, "host:port"], + "help" => ["help_cluster_cmd", 1, "(show this help)"] +} + +ALLOWED_OPTIONS={ + "create" => {"replicas" => true}, + "add-node" => {"slave" => false, "master-id" => true}, + "import" => {"from" => :required, "copy" => false, "replace" => false}, + "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false} +} + +def show_help + puts "Usage: redis-trib <command> <options> <arguments ...>\n\n" + COMMANDS.each{|k,v| + o = "" + puts " #{k.ljust(15)} #{v[2]}" + if ALLOWED_OPTIONS[k] + ALLOWED_OPTIONS[k].each{|optname,has_arg| + puts " --#{optname}" + (has_arg ? " <arg>" : "") + } + end + } + puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n" +end + +# Sanity check +if ARGV.length == 0 + show_help + exit 1 +end + +rt = RedisTrib.new +cmd_spec = COMMANDS[ARGV[0].downcase] +if !cmd_spec + puts "Unknown redis-trib subcommand '#{ARGV[0]}'" + exit 1 +end + +# Parse options +cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase) +rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1)) + +# Dispatch +rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options) diff --git a/dev/supervisord.conf b/dev/supervisord.conf new file mode 100644 index 0000000..9f26964 --- /dev/null +++ b/dev/supervisord.conf @@ -0,0 +1,27 @@ +[supervisord] +nodaemon=yes + +[program:redis7000] +command=redis-server /7000/redis.conf + +[program:redis7001] +command=redis-server /7001/redis.conf + +[program:redis7002] +command=redis-server /7002/redis.conf + +[program:redis7003] +command=redis-server /7003/redis.conf + +[program:redis7004] +command=redis-server /7004/redis.conf + +[program:redis7005] +command=redis-server /7005/redis.conf + +[program:create_cluster] +command=sh /create_cluster.sh +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..100e2c3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +--- +client: + build: . + ports: + - "6022:22" + volumes: + - .:/code + links: + - redis + +redis: + build: ./dev + ports: + - "7022:22" diff --git a/example.rb b/example.rb index d8d9801..8061d7c 100644 --- a/example.rb +++ b/example.rb @@ -1,4 +1,4 @@ -require './cluster' +require './rediscluster' if ARGV.length != 2 startup_nodes = [ diff --git a/lib/connection_table.rb b/lib/connection_table.rb new file mode 100644 index 0000000..928bd7d --- /dev/null +++ b/lib/connection_table.rb @@ -0,0 +1,101 @@ +require 'connection_pool' +require 'redis' + +class ConnectionTable + + + def split_node(node) + host, port = node.split(':') + [host, port] + end + + def initialize(max_connections, read_slave: false, opt: {}) + @opt = opt + @max_connections = max_connections + @read_slave = read_slave + @master_conns = {} + @slave_conns = {} + @slots = {} + @pid = get_pid + end + + def inspect + "#<#{self.class.name}: @master_conns=#{@master_conns.keys}, @slave_conns=#{@slave_conns.keys}, @max_connections=#{@max_connections}>" + end + + def new_pool(node, read_only: false) + host, port = split_node(node) + ConnectionPool.new(size: @max_connections) { + opt = @opt.dup + opt[:host] = host + opt[:port] = port + r = Redis.new(opt) + if read_only + r.readonly + end + r + } + end + + def get_pool_by_node(node) + @master_conns.fetch(node, nil) or @slave_conns.fetch(node) + end + + def get_connection_by_node(node) + pool = get_pool_by_node(node) + pool.with do |conn| + return conn + end + end + + def get_random_connection(master_only) + keys = master_only ? @master_conns.keys : @master_conns.keys + @slave_conns.keys + random_node = keys.sample + get_connection_by_node(random_node) + end + + def get_connection_by_slot(slot, master_only) + nodes = @slots[slot] + node = @read_slave && !master_only ? nodes.sample: nodes[0] + get_connection_by_node(node) + end + + def init_node_pool(nodes) + nodes.each do |n| + name = n[:name] + if @master_conns.has_key?(name) || @slave_conns.has_key?(name) + next + end + if n[:role] == 'master' + reset_master_node!(name) + next + end + reset_slave_node!(name) + end + end + + def update_slot!(newslot, node_names) + @slots[newslot] = node_names + end + + def reset_master_node!(node) + @master_conns[node] = new_pool(node) + end + + def reset_slave_node!(node) + @slave_conns[node] = new_pool(node, read_only: true) + end + + def get_pid + Process.pid + end + + def make_fork_safe(nodes) + if @pid != get_pid + @master_conns = {} + @slave_conns = {} + init_node_pool(nodes) + end + end + +end diff --git a/lib/exceptions.rb b/lib/exceptions.rb new file mode 100644 index 0000000..af15060 --- /dev/null +++ b/lib/exceptions.rb @@ -0,0 +1,4 @@ +module Exceptions + class StartupNodesUnreachable < StandardError; end + class CrossSlotsError < RuntimeError; end +end diff --git a/redis-rb-cluster.gemspec b/redis-rb-cluster.gemspec new file mode 100644 index 0000000..31fe21e --- /dev/null +++ b/redis-rb-cluster.gemspec @@ -0,0 +1,17 @@ +Gem::Specification.new do |s| + s.name = "rediscluster" + s.platform = Gem::Platform::RUBY + s.version = 0.1 + s.authors = ["Antirez", "iandyh"] + s.email = [] + #s.homepage = "https://github.com/" + s.description = s.summary = %q{Redis Cluster client for Ruby} + + s.files = `git ls-files`.split("\n") + s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") + s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } + s.require_paths = ["."] + #s.license = "BSD" + s.add_runtime_dependency 'connection_pool', '~> 2.2', '>= 2.2.0' + s.add_runtime_dependency 'redis', '~> 3.2', '>= 3.2.1' +end diff --git a/rediscluster.rb b/rediscluster.rb new file mode 100644 index 0000000..ef665e4 --- /dev/null +++ b/rediscluster.rb @@ -0,0 +1,785 @@ +# coding: utf-8 +# Copyright (C) 2013 Salvatore Sanfilippo <antirez@gmail.com> +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +require 'logger' +require 'resolv' +require 'redis' +require_relative 'crc16' +require_relative 'lib/connection_table' +require_relative 'lib/exceptions' + + +class RedisCluster + + RedisClusterHashSlots = 16384 + RedisClusterRequestTTL = 16 + RedisClusterDefaultTimeout = 1 + + # Initialise the client + + # @param [Fixnum] number of connections in each connection pool + # @param [boolean] if read from slave is required + # @param [Hash] redis-rb connection options + def initialize(startup_nodes, max_connections: 3, read_slave: false, + cluster_retry_interval: 3, conn_opt: {}) + @startup_nodes = startup_nodes + @conn_opt = conn_opt + @cluster_retry_interval = cluster_retry_interval + @connections = ConnectionTable.new(max_connections, + read_slave: read_slave, + opt: @conn_opt) + @refresh_table_asap = false + @log = Logger.new(STDOUT) + @log.level = Logger::INFO + initialize_slots_cache + end + + def inspect + "#<#{self.class.name}: @connections=#{@connections.inspect}, @startup_nodes=#{@startup_nodes}>" + end + + def get_redis_link(host,port) + opt = @conn_opt.dup + opt[:host] = host + opt[:port] = port + Redis.new(opt) + end + + # Fetch nodes from slots command + # According to the protocol, first item in the array is master and the rest are slaves + # To accelerate the process, cache is used for getting hostnames + def fetch_nodes(nodes, dns_cache) + ret = [] + nodes.each_with_index do |item, index| + ip, port = item + host = dns_cache.fetch(ip) { + |missing_ip| + host = Resolv.getname(missing_ip) + dns_cache[ip] = host + host + } + name = "#{host}:#{port}" + role = index == 0 ? 'master' : 'slave' + node = { + :host => host, :port => port, + :name => name, :ip => ip, + :role => role + } + ret << node + end + ret + end + + # Contact the startup nodes and try to fetch the hash slots -> instances + # map in order to initialize the @slots hash. + def initialize_slots_cache + startup_nodes_reachable = false + dns_cache = {} + @startup_nodes.each{|n| + begin + nodes = [] + r = get_redis_link(n[:host],n[:port]) + r.cluster("slots").each {|r| + slot_nodes = fetch_nodes(r[2..-1], dns_cache) + nodes += slot_nodes + node_names = slot_nodes.map { |x| x[:name]}.compact + (r[0]..r[1]).each{|slot| + @connections.update_slot!(slot, node_names) + } + @connections.init_node_pool(slot_nodes) + } + populate_startup_nodes(nodes) + @refresh_table_asap = false + rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES + # Try with the next node on error. + next + rescue + raise + end + # Exit the loop as long as the first node replies + startup_nodes_reachable = true + break + } + if !startup_nodes_reachable + raise Exceptions::StartupNodesUnreachable + end + end + + # Use nodes to populate @startup_nodes, so that we have more chances + # if a subset of the cluster fails. + def populate_startup_nodes(nodes) + nodes.uniq! + @startup_nodes = nodes + end + + # Return the hash slot from the key. + def keyslot(key) + # Only hash what is inside {...} if there is such a pattern in the key. + # Note that the specification requires the content that is between + # the first { and the first } after the first {. If we found {} without + # nothing In the middle, the whole key is hashed as usually. + s = key.index "{" + if s + e = key.index "}",s+1 + if e && e != s+1 + key = key[s+1..e-1] + end + end + RedisClusterCRC16.crc16(key) % RedisClusterHashSlots + end + + # Return the first key in the command arguments. + # + # Currently we just return argv[1], that is, the first argument + # after the command name. + # + # This is indeed the key for most commands, and when it is not true + # the cluster redirection will point us to the right node anyway. + # + # For commands we want to explicitly bad as they don't make sense + # in the context of cluster, nil is returned. + def get_key_from_command(argv) + case argv[0].to_s.downcase + when "info","multi","exec","slaveof","config","shutdown" + return nil + when "bitop" + return argv[2] + else + # Unknown commands, and all the commands having the key + # as first argument are handled here: + # set, get, ... + return argv[1] + end + end + + # Dispatch commands. + def send_cluster_command(argv, master_only: true, &blk) + key = get_key_from_command(argv) + slot = keyslot(key) + initialize_slots_cache if @refresh_table_asap + @connections.make_fork_safe(@startup_nodes) + ttl = RedisClusterRequestTTL; # Max number of redirections + e = "" + asking = false + try_random_node = false + while ttl > 0 + ttl -= 1 + initialize_slots_cache if @refresh_table_asap + raise "No way to dispatch this command to Redis Cluster." if !key + if try_random_node + r = @connections.get_random_connection(master_only) + try_random_node = false + else + r = @connections.get_connection_by_slot(slot, master_only) + end + + begin + # TODO: use pipelining to send asking and save a rtt. + r.asking if asking + asking = false + return r.send(argv[0].to_sym,*argv[1..-1], &blk) + rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES + try_random_node = true + sleep(@cluster_retry_interval) if ttl < RedisClusterRequestTTL/2 + rescue => e + errv = e.to_s.split + if errv[0] == "MOVED" || errv[0] == "ASK" + if errv[0] == "ASK" + asking = true + else + # Serve replied with MOVED. It's better for us to + # ask for CLUSTER NODES the next time. + @refresh_table_asap = true + end + newslot = errv[1].to_i + node_name = errv[2] + if !asking + ip, port = node_name.split(":") + node_name = "#{Resolv.getname(ip)}:#{port}" + @connections.update_slot!(newslot, [node_name]) + end + elsif errv[0] == "CLUSTERDOWN" + try_random_node = true + sleep(@cluster_retry_interval) if ttl < RedisClusterRequestTTL/2 + @refresh_table_asap = true + else + raise e + end + end + end + raise "Too many Cluster redirections? (last error: #{e})" + end + + # Some commands are not implemented yet + # If someone tries to use them, a NotImplementedError is thrown + def method_missing(*argv) + cmd = argv[0].to_s + raise NotImplementedError, "#{cmd} command is not implemented now!" + end + + def execute_cmd_on_all_nodes(argv, master_only: true, log_required: false) + @connections.make_fork_safe(@startup_nodes) + ret = {} + cmd = argv.shift + @startup_nodes.each do |n| + if master_only && n[:role] == 'slave' + next + end + node_name = n[:name] + r = @connections.get_connection_by_node(node_name) + ret[node_name] = r.public_send(cmd, *argv) + if log_required + all = [cmd] + argv + @log.info("Successfully sent #{all.to_s} to #{node_name}") + end + end + ret + end + + def _check_keys_in_same_slot(keys) + prev_slot = nil + keys.each do |k| + slot = keyslot(k) + if prev_slot && prev_slot != slot + raise Exceptions::CrossSlotsError + end + prev_slot = slot + end + end + + # server commands + def config(action, *argv) + argv = [action] + argv + log_required = [:resetstat, :set].member?(action) + execute_cmd_on_all_nodes([:config, *argv], master_only: false, + log_required: log_required) + end + + + def dbsize + execute_cmd_on_all_nodes([:dbsize], master_only: false) + end + + def flushall + execute_cmd_on_all_nodes([:flushall]) + end + + def flushdb + execute_cmd_on_all_nodes([:flushdb]) + end + + def info(cmd = nil) + execute_cmd_on_all_nodes([:info, cmd], master_only: false) + end + + def shutdown + execute_cmd_on_all_nodes([:shutdown], master_only: false) + end + + def slowlog(subcommand, length=nil) + execute_cmd_on_all_nodes([:slowlog, subcommand, length], + master_only: false) + end + + def time + execute_cmd_on_all_nodes([:time], master_only: false) + end + + # connection commands + def ping + execute_cmd_on_all_nodes([:ping], master_only: false) + end + + # string commands + def append(key, value) + send_cluster_command([:append, key, value]) + end + + def bitcount(key, start = 0, stop = -1) + send_cluster_command([:bitcount, key, start, stop], + master_only: false) + end + + def bitop(operation, dest_key, *keys) + _check_keys_in_same_slot([dest_key] + keys) + send_cluster_command([:bitop, operation, dest_key, *keys]) + end + + def bitpos(key, bit, start = 0, stop = -1) + send_cluster_command([:bitpos, key, bit, start, stop]) + end + + def decr(key) + send_cluster_command([:decr, key]) + end + + def decrby(key, decrement) + send_cluster_command([:decrby, key, decrement]) + end + + def get(key) + send_cluster_command([:get, key], master_only: false) + end + + def getbit(key, offset) + send_cluster_command([:getbit, key, offset], master_only: false) + end + + def getrange(key, start, stop) + send_cluster_command([:getrange, key, start, stop], + master_only: false) + end + + def incr(key) + send_cluster_command([:incr, key]) + end + + def incrby(key, increment) + send_cluster_command([:incrby, key, increment]) + end + + def incrbyfloat(key, increment) + send_cluster_command([:incrbyfloat, key, increment]) + end + + def mget(*keys, &blk) + _check_keys_in_same_slot(keys) + send_cluster_command([:mget, *keys], master_only: false, &blk) + end + + def mapped_mget(*keys) + mget(*keys) do |reply| + if reply.kind_of?(Array) + Hash[keys.zip(reply)] + else + reply + end + end + end + + def psetex(key, millisec, value) + send_cluster_command([:psetex, key, millisec, value]) + end + + def set(key, value) + send_cluster_command([:set, key, value]) + end + + def mset(*args) + keys = args.select.each_with_index { |_, i| i.even? } + _check_keys_in_same_slot(keys) + send_cluster_command([:mset, *args]) + end + + def mapped_mset(hash) + mset(*hash.to_a.flatten) + end + + def msetnx(*args) + keys = args.select.each_with_index { |_, i| i.even? } + _check_keys_in_same_slot(keys) + send_cluster_command([:msetnx, *args]) + end + + def mapped_msetnx(hash) + msetnx(*hash.to_a.flatten) + end + + def setbit(key, offset, value) + send_cluster_command([:setbit, key, offset, value]) + end + + def setex(key, seconds, value) + send_cluster_command([:setex, key, seconds, value]) + end + + def setnx(key, value) + send_cluster_command([:setnx, key, value]) + end + + def setrange(key, offset, value) + send_cluster_command([:setrange, key, offset, value]) + end + + def strlen(key) + send_cluster_command([:strlen, key], master_only: false) + end + + # list commands + def blpop(*argv) + keys = argv[0..-2] + _check_keys_in_same_slot(keys) + send_cluster_command([:blpop, *argv]) + end + + def brpop(*argv) + keys = argv[0..-2] + _check_keys_in_same_slot(keys) + send_cluster_command([:brpop, *argv]) + end + + def brpoplpush(source, destination, options = {}) + _check_keys_in_same_slot([source, destination]) + send_cluster_command([:brpoplpush, source, destination, options]) + end + + def lindex(key, index) + send_cluster_command([:lindex, key, index], master_only: false) + end + + def linsert(key, where, pivot, value) + send_cluster_command([:linsert, key, where, pivot, value]) + end + + def llen(key) + send_cluster_command([:llen, key], master_only: false) + end + + def lpop(key) + send_cluster_command([:lpop, key]) + end + + def lpush(key, value) + send_cluster_command([:lpush, key, value]) + end + + def lpushx(key, value) + send_cluster_command([:lpushx, key, value]) + end + + def lrange(key, start, stop) + send_cluster_command([:lrange, key, start, stop], master_only: false) + end + + def lrem(key, count, value) + send_cluster_command([:lrem, key, count, value]) + end + + def lset(key, index, value) + send_cluster_command([:lset, key, index, value], master_only: false) + end + + def ltrim(key, start, stop) + send_cluster_command([:ltrim, key, start, stop]) + end + + def rpop(key) + send_cluster_command([:rpop, key]) + end + + def rpoplpush(source, destination) + _check_keys_in_same_slot([source, destination]) + send_cluster_command([:rpoplpush, source, destination]) + end + + def rpush(key, value) + send_cluster_command([:rpush, key, value]) + end + + def rpushx(key, value) + send_cluster_command([:rpushx, key, value]) + end + + # set commands + def sadd(key, member) + send_cluster_command([:sadd, key, member]) + end + + def scard(key) + send_cluster_command([:scard, key]) + end + + def sdiff(*keys) + _check_keys_in_same_slot(keys) + send_cluster_command([:sdiff, *keys]) + end + + def sdiffstore(destination, *keys) + _check_keys_in_same_slot([destination, *keys]) + send_cluster_command([:sdiffstore, destination, *keys]) + end + + def sinter(*keys) + _check_keys_in_same_slot(keys) + send_cluster_command([:sinter, *keys]) + end + + def sinterstore(destination, *keys) + _check_keys_in_same_slot([destination, *keys]) + send_cluster_command([:sinterstore, destination, *keys]) + end + + def sismember(key, member) + send_cluster_command([:sismember, key, member], master_only: false) + end + + def smembers(key) + send_cluster_command([:smembers, key], master_only: false) + end + + def smove(source, destination, member) + _check_keys_in_same_slot([source, destination]) + send_cluster_command([:smove, source, destination, member]) + end + + def spop(key) + send_cluster_command([:spop, key]) + end + + def srandmember(key, count = nil) + send_cluster_command([:srandmember, key, count], master_only: false) + end + + def srem(key, member) + send_cluster_command([:srem, key, member]) + end + + def sunion(*keys) + _check_keys_in_same_slot(keys) + send_cluster_command([:sunion, *keys], master_only: false) + end + + def sunionstore(destination, *keys) + _check_keys_in_same_slot([destination, *keys]) + send_cluster_command([:sunionstore, destination, *keys]) + end + + def sscan(key, cursor, options = {}) + send_cluster_command([:sscan, key, cursor, options]) + end + + # sorted set commands + def zadd(key, *argv) + send_cluster_command([:zadd, key, *argv]) + end + + def zcard(key) + send_cluster_command([:zcard, key], master_only: false) + end + + def zcount(key, min, max) + send_cluster_command([:zcount, key, min, max], master_only: false) + end + + def zincrby(key, increment, member) + send_cluster_command([:zincrby, key, increment, member]) + end + + def zinterstore(destination, keys, options = {}) + _check_keys_in_same_slot([destination, *keys]) + send_cluster_command([:zinterstore, destination, keys, options]) + end + + #def zlexcount(key, min, max) + # redis-rb hasn't implement it yet + #send_cluster_command([:zlexcount, key, min, max]) + #end + + def zrange(key, start, stop, options = {}) + send_cluster_command([:zrange, key, start, stop, options], + master_only: false) + end + + def zrangebylex(key, min, max, options = {}) + send_cluster_command([:zrangebylex, key, min, max, options], + master_only: false) + end + + def zrevrangebylex(key, max, min, options = {}) + send_cluster_command([:zrevrangebylex, key, max, min, options], + master_only: false) + end + + def zrangebyscore(key, min, max, options = {}) + send_cluster_command([:zrangebyscore, key, min, max, options], + master_only: false) + end + + def zrank(key, member) + send_cluster_command([:zrank, key, member], master_only: false) + end + + def zrem(key, member) + send_cluster_command([:zrem, key, member]) + end + + # def zremrangebylex(key, min, max) + # end + + def zremrangebyrank(key, start, stop) + send_cluster_command([:zremrangebyrank, key, start, stop]) + end + + def zremrangebyscore(key, min, max) + send_cluster_command([:zremrangebystore, key, start, stop]) + end + + def zrevrange(key, start, stop, options = {}) + send_cluster_command([:zrevrange, key, start, stop, options], + master_only: false) + end + + def zrevrangebyscore(key, max, min, options = {}) + send_cluster_command([:zrevrangebyscore, key, max, min, options], + master_only: false) + end + + def zrevrank(key, member) + send_cluster_command([:zrevrank, key, member], master_only: false) + end + + def zscore(key, member) + send_cluster_command([:zscore, key, member], master_only: false) + end + + def zunionstore(destination, keys, options = {}) + _check_keys_in_same_slot([destination, *keys]) + send_cluster_command([:zunionstore, destination, keys, options]) + end + + def zscan(key, cursor, options = {}) + send_cluster_command([:zscan, key, cursor, options]) + end + + # hash commands + def hdel(key, field) + send_cluster_command([:hdel, key, field]) + end + + def hexists(key, field) + send_cluster_command([:hexists, key, field], master_only: false) + end + + def hget(key, field) + send_cluster_command([:hget, key, field], master_only: false) + end + + def hgetall(key) + send_cluster_command([:hgetall, key], master_only: false) + end + + def hincrby(key, field, increment) + send_cluster_command([:hincrby, key, field, increment]) + end + + def hincrbyfloat(key, field, increment) + send_cluster_command([:hincrbyfloat, key, field, increment]) + end + + def hkeys(key) + send_cluster_command([:hkeys, key], master_only: false) + end + + def hlen(key) + send_cluster_command([:hlen, key], master_only: false) + end + + def hmget(key, *fields, &blk) + send_cluster_command([:hmget, key, *fields], master_only: false, &blk) + end + + def mapped_hmget(key, *fields) + hmget(key, *fields) do |reply| + if reply.kind_of?(Array) + Hash[fields.zip(reply)] + else + reply + end + end + end + + def hmset(key, *attrs) + send_cluster_command([:hmset, key, *attrs]) + end + + def mapped_hmset(key, hash) + hmset(key, hash.to_a.flatten) + end + + def hset(key, field, value) + send_cluster_command([:hset, key, field, value]) + end + + def hsetnx(key, field, value) + send_cluster_command([:hsetnx, key, field, value]) + end + + # def hstrlen(key, field) + # send_cluster_command([:hstrlen, key, field]) + # end + + def hvals(key) + send_cluster_command([:hvals, key], master_only: false) + end + + def hscan(key, cursor, options = {}) + send_cluster_command([:hscan, key, cursor, options]) + end + + # keys command + def del(*keys) + total = 0 + keys.each do |k| + total += send_cluster_command([:del, k]) + end + total + end + + def exists(key) + send_cluster_command([:exists, key], master_only: false) + end + + def expire(key, seconds) + send_cluster_command([:expire, key, seconds]) + end + + def expireat(key, unix_time) + send_cluster_command([:expireat, key, unix_time]) + end + + def keys(pattern = "*") + # only for debugging purpose + ret = execute_cmd_on_all_nodes([:keys, pattern]) + ret.values.flatten + end + + def persist(key) + send_cluster_command([:persist, key]) + end + + def ttl(key) + send_cluster_command([:ttl, key]) + end + + def pexpire(key, milliseconds) + send_cluster_command([:pexpire, key, milliseconds]) + end + + def pexpireat(key, ms_unix_time) + send_cluster_command([:pexpireat, key, ms_unix_time]) + end + + def pttl(key) + send_cluster_command([:pttl, key]) + end + +end diff --git a/tests/test_base.rb b/tests/test_base.rb new file mode 100644 index 0000000..af32bfe --- /dev/null +++ b/tests/test_base.rb @@ -0,0 +1,35 @@ +require_relative '../rediscluster' +require 'test/unit' + + +class TestBase < Test::Unit::TestCase + + OK = "OK" + KEY = "asdf" + + def setup + host = 'redis' + port1 = '7002' + port2 = '7003' + + startup_nodes = [ + {:host => host, :port => port1}, + {:host => host, :port => port2}, + ] + conn_opt = {:timeout => 2} + @rc = RedisCluster.new(startup_nodes, max_connections: 2, read_slave: true, conn_opt: conn_opt) + end + + def teardown + @rc.flushdb + end + + def get_keys_in_same_slot + ['bbb', 'mngopdrw'] + end + + def get_keys_in_diff_slot + ['bbb', 'aaa'] + end + +end diff --git a/tests/test_connection_cmds.rb b/tests/test_connection_cmds.rb new file mode 100644 index 0000000..1c1ce8c --- /dev/null +++ b/tests/test_connection_cmds.rb @@ -0,0 +1,12 @@ +require_relative 'test_base' + +class TestConnectionCmds < TestBase + + def test_ping + r = @rc.ping + r.each do |node, res| + assert_equal('PONG', res) + end + end + +end diff --git a/tests/test_hash_cmds.rb b/tests/test_hash_cmds.rb new file mode 100644 index 0000000..e83a625 --- /dev/null +++ b/tests/test_hash_cmds.rb @@ -0,0 +1,102 @@ +require_relative 'test_base' + +class TestHashCmds < TestBase + + FIELD1 = 'hello' + FIELD2 = 'world' + + def test_hdel + @rc.hset(KEY, FIELD1, 'a') + assert_equal(1, @rc.hdel(KEY, FIELD1)) + end + + def test_hexists + @rc.hset(KEY, FIELD1, 'a') + assert_equal(false, @rc.hexists(KEY, FIELD2)) + assert_equal(true, @rc.hexists(KEY, FIELD1)) + end + + def test_hget + @rc.hset(KEY, FIELD1, 'a') + assert_equal('a', @rc.hget(KEY, FIELD1)) + end + + def test_hgetall + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + t = { + FIELD1 => 'a', + FIELD2 => 'b' + } + assert_equal(t, @rc.hgetall(KEY)) + end + + def test_hincrby + @rc.hset(KEY, FIELD1, 1) + assert_equal(2, @rc.hincrby(KEY, FIELD1, 1)) + end + + def test_hincrbyfloat + @rc.hset(KEY, FIELD1, 1) + assert_equal(1.1, @rc.hincrbyfloat(KEY, FIELD1, 0.1)) + end + + def test_hkeys + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + assert_equal([FIELD1, FIELD2], @rc.hkeys(KEY).sort!) + end + + def test_hlen + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + assert_equal(2, @rc.hlen(KEY)) + end + + def test_hmget + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + assert_equal(['a', 'b'], @rc.hmget(KEY, *[FIELD1, FIELD2]).sort!) + end + + def test_mapped_get + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + ret = {FIELD1 => 'a', FIELD2 => 'b'} + assert_equal(ret, @rc.mapped_hmget(KEY, *[FIELD1, FIELD2])) + end + + def test_hmset + attrs = [FIELD1, 'a', FIELD2, 'b'] + assert_equal(OK, @rc.hmset(KEY, *attrs)) + assert_equal(['a', 'b'], @rc.hmget(KEY, *[FIELD1, FIELD2]).sort!) + end + + def test_mapped_hmset + hash = {FIELD1 => 'a', FIELD2 => 'b'} + assert_equal(OK, @rc.mapped_hmset(KEY, hash)) + assert_equal(['a', 'b'], @rc.hmget(KEY, *[FIELD1, FIELD2]).sort!) + end + + def test_hsetnx + @rc.hset(KEY, FIELD1, 'a') + assert_equal(false, @rc.hsetnx(KEY, FIELD1, 'a')) + end + + # def test_hstrlen + # @rc.hset(KEY, FIELD1, 'a') + # assert_equal(1, @rc.hstrlen(KEY, FIELD1)) + # end + + def test_hvals + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + assert_equal(['a', 'b'], @rc.hvals(KEY).sort!) + end + + def test_hscan + @rc.hset(KEY, FIELD1, 'a') + @rc.hset(KEY, FIELD2, 'b') + assert_equal(["0", [[FIELD1, "a"], [FIELD2, "b"]]], @rc.hscan(KEY, 0)) + end +end diff --git a/tests/test_keys_cmd.rb b/tests/test_keys_cmd.rb new file mode 100644 index 0000000..221ceee --- /dev/null +++ b/tests/test_keys_cmd.rb @@ -0,0 +1,61 @@ +require_relative '../lib/exceptions' +require_relative 'test_base' + +class TestKeysCmd < TestBase + + def test_del + key1, key2 = get_keys_in_diff_slot + @rc.set(key1, 1) + assert_equal(1, @rc.del(key1, key2)) + end + + def test_exists + assert_equal(false, @rc.exists(KEY)) + end + + def test_expire_and_ttl + @rc.set(KEY, 1) + assert_equal(true, @rc.expire(KEY, 100)) + assert_equal(100, @rc.ttl(KEY)) + end + + def test_expireat + @rc.set(KEY, 1) + assert_equal(true, @rc.expireat(KEY, Time.now.to_i)) + end + + def test_keys + @rc.set('two', 1) + @rc.set('one', 1) + assert_equal(['one', 'two'], @rc.keys("*o*").sort!) + end + + def test_persist + @rc.set(KEY, 1) + @rc.expire(KEY, 100) + @rc.persist(KEY) + assert_equal(-1, @rc.ttl(KEY)) + end + + def test_pexpire + @rc.set(KEY, 1) + assert_equal(true, @rc.pexpire(KEY, 1000)) + assert_equal(1, @rc.ttl(KEY)) + end + + def test_pexpireat + @rc.set(KEY, 1) + assert_equal(true, @rc.pexpireat(KEY, (Time.now.to_f * 1000).to_i)) + end + + def test_pttl + @rc.set(KEY, 1) + @rc.pexpire(KEY, 1000) + assert_instance_of(Fixnum, @rc.pttl(KEY)) + end + + def test_monitor + assert_raise(NotImplementedError) { @rc.monitor } + end + +end diff --git a/tests/test_list_cmds.rb b/tests/test_list_cmds.rb new file mode 100644 index 0000000..e79e33a --- /dev/null +++ b/tests/test_list_cmds.rb @@ -0,0 +1,116 @@ +require_relative '../lib/exceptions' +require_relative 'test_base' +require 'test/unit' + + +class TestListCmds < TestBase + + def _push_items_to_list + @rc.lpush(KEY, 'b') + @rc.lpush(KEY, 'a') + end + + def test_blpop + _push_items_to_list + assert_equal(['asdf', 'a'], @rc.blpop(KEY)) + end + + def test_brpop + _push_items_to_list + assert_equal(['asdf', 'b'], @rc.brpop(KEY)) + end + + def test_brpoplpush_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.brpoplpush(key1, key2, 1) } + end + + def test_brpoplpush + key1, key2 = get_keys_in_same_slot + @rc.lpush(key1, 'a') + @rc.lpush(key2, 'b') + assert_equal('a', @rc.brpoplpush(key1, key2, 1)) + end + + def test_lindex + @rc.lpush(KEY, 'a') + assert_equal('a', @rc.lindex(KEY, 0)) + end + + def test_linsert + _push_items_to_list + assert_equal(3, @rc.linsert(KEY, 'before', 'b', 'c')) + assert_equal(['a', 'c', 'b'], @rc.lrange(KEY, 0, -1)) + end + + def test_llen + @rc.lpush(KEY, 'a') + assert_equal(1, @rc.llen(KEY)) + end + + def test_lpop + _push_items_to_list + assert_equal('a', @rc.lpop(KEY)) + end + + def test_lpush + assert_equal(1, @rc.lpush(KEY, 'a')) + end + + def test_lpushx + assert_equal(0, @rc.lpushx(KEY, 'a')) + _push_items_to_list + assert_equal(3, @rc.lpushx(KEY, 'a')) + end + + def test_lrange + _push_items_to_list + assert_equal(['a', 'b'], @rc.lrange(KEY, 0, 2)) + end + + def test_lrem + _push_items_to_list + @rc.lpush(KEY, 'a') + assert_equal(2, @rc.lrem(KEY, 2, 'a')) + end + + def test_lset + _push_items_to_list + assert_equal(OK, @rc.lset(KEY, 0, 'a')) + end + + def test_ltrim + _push_items_to_list + assert_equal(OK, @rc.ltrim(KEY, 1, -1)) + assert_equal(['b'], @rc.lrange(KEY, 0, -1)) + end + + def test_rpop + _push_items_to_list + assert_equal('b', @rc.rpop(KEY)) + end + + def test_rpoplpush_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.rpoplpush(key1, key2) } + end + + def test_rpoplpush + key1, key2 = get_keys_in_same_slot + @rc.lpush(key1, 'a') + @rc.lpush(key2, 'b') + assert_equal('a', @rc.rpoplpush(key1, key2)) + end + + def test_rpush + assert_equal(1, @rc.rpush(KEY, 'a')) + assert_equal(3, @rc.rpush(KEY, ['b', 'c'])) + end + + def test_rpushx + assert_equal(0, @rc.rpushx(KEY, 'a')) + _push_items_to_list + assert_equal(3, @rc.rpushx(KEY, 'a')) + end + +end diff --git a/tests/test_server_cmds.rb b/tests/test_server_cmds.rb new file mode 100644 index 0000000..d59ad1e --- /dev/null +++ b/tests/test_server_cmds.rb @@ -0,0 +1,48 @@ +require_relative 'test_base' + +class TestSeverCmds < TestBase + + def test_config + time = 60 + @rc.config(:set, 'tcp-keepalive', time) + cfg = @rc.config(:get, 'tcp-keepalive') + cfg.each do |node, c| + assert_equal(time.to_s, c['tcp-keepalive']) + end + end + + def test_dbsize + assert_instance_of(Hash, @rc.dbsize) + end + + def test_flushdb + ret = @rc.flushdb + ret.each do |node, res| + assert_equal(OK, res) + end + end + + def test_flushall + ret = @rc.flushall + ret.each do |node, res| + assert_equal(OK, res) + end + end + + def test_info + assert_instance_of(Hash, @rc.info) + end + + def test_shutdown + # Something cannot be easily tested... + end + + def test_slowlog + assert_instance_of(Hash, @rc.slowlog(:get, 2)) + end + + def test_time + assert_instance_of(Hash, @rc.time) + end + +end diff --git a/tests/test_set_cmds.rb b/tests/test_set_cmds.rb new file mode 100644 index 0000000..8870543 --- /dev/null +++ b/tests/test_set_cmds.rb @@ -0,0 +1,146 @@ +require_relative 'test_base' + +class TestSetCmds < TestBase + + def test_sadd + assert_equal(true, @rc.sadd(KEY, 'a')) + end + + def test_sadd_with_array + value = ['a', 'b'] + assert_equal(2, @rc.sadd(KEY, value)) + end + + def test_scard + @rc.sadd(KEY, ['a', 'b']) + assert_equal(2, @rc.scard(KEY)) + end + + def test_sdiff_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sdiff(key1, key2) } + end + + def test_sdiff + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'b') + assert_equal(['a'], @rc.sdiff(key1, key2)) + end + + def test_sdiffstore_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sdiffstore(key1, key1, key2) } + end + + def test_sdiffstore + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'b') + assert_equal(1, @rc.sdiffstore(key1, key1, key2)) + end + + def test_sinter_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sinter(key1, key2) } + end + + def test_sinter + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'b') + assert_equal(['b'], @rc.sinter(key1, key2)) + end + + def test_sinterstore_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sinterstore(key1, key1, key2) } + end + + def test_sinterstore + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'b') + assert_equal(1, @rc.sinterstore(key1, key1, key2)) + end + + def test_sismember + @rc.sadd(KEY, 'a') + assert_equal(false, @rc.sismember(KEY, 'b')) + assert_equal(true, @rc.sismember(KEY, 'a')) + end + + def test_smembers + value = ['a', 'b'] + @rc.sadd(KEY, value) + ret = @rc.smembers(KEY) + assert_equal(value, ret.sort!) + end + + def test_smove_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.smove(key1, key2, 'a') } + end + + def test_smove + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'c') + assert_equal(true, @rc.smove(key2, key1, 'c')) + assert_equal(['a', 'b', 'c'], @rc.smembers(key1).sort!) + end + + def test_spop + value = ['a', 'b'] + @rc.sadd(KEY, ['a', 'b']) + ret = @rc.spop(KEY) + assert_equal(true, value.member?(ret)) + end + + def test_srandmember + value = ['a', 'b'] + @rc.sadd(KEY, value) + assert_equal(value, @rc.srandmember(KEY, 2).sort!) + end + + def test_srem + value = ['a', 'b'] + @rc.sadd(KEY, value) + assert_equal(2, @rc.srem(KEY, ['a', 'b'])) + value = ['a', 'b'] + @rc.sadd(KEY, value) + assert_equal(true, @rc.srem(KEY, 'a')) + end + + def test_sunion_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sunion(key1, key2) } + end + + def test_sunion + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'c') + assert_equal(['a', 'b', 'c'], @rc.sunion(key1, key2).sort!) + end + + def test_sunionstore_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.sunionstore(key1, key1, key2) } + end + + def test_sunionstore + key1, key2 = get_keys_in_same_slot + @rc.sadd(key1, ['a', 'b']) + @rc.sadd(key2, 'c') + assert_equal(3, @rc.sunionstore(key1, key1, key2)) + end + + def test_sscan + @rc.sadd(KEY, ['a', 'b']) + r = @rc.sscan(KEY, 0) + r[1].sort! + assert_equal(['0', ['a', 'b']], r) + end + +end diff --git a/tests/test_sorted_set_cmds.rb b/tests/test_sorted_set_cmds.rb new file mode 100644 index 0000000..a2d5213 --- /dev/null +++ b/tests/test_sorted_set_cmds.rb @@ -0,0 +1,121 @@ +require_relative '../lib/exceptions' +require_relative 'test_base' + + +class TestSortedSetCmds < TestBase + + def test_zadd + assert_equal(true, @rc.zadd(KEY, 1, 'one')) + end + + def test_zcard + @rc.zadd(KEY, 1, 'a') + assert_equal(1, @rc.zcard(KEY)) + end + + def test_zcount + @rc.zadd(KEY, [[1, 'a'], [2, 'b'], [3, 'c']]) + assert_equal(3, @rc.zcount(KEY, 1, 3)) + end + + def test_zincrby + @rc.zadd(KEY, [[1, 'a'], [2, 'b'], [3, 'c']]) + assert_equal(4, @rc.zincrby(KEY, 3, 'a')) + end + + def test_zinterstore_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.zinterstore(key1, [key1, key2]) } + end + + def test_zinterstore + key1, key2 = get_keys_in_same_slot + @rc.zadd(key1, 1, 'a') + @rc.zadd(key2, 2, 'a') + assert_equal(1, @rc.zinterstore(key1, [key1, key2])) + end + + # def test_zlexcount + # @rc.zadd(KEY, [[0, 'a'], [0, 'b']) + # assert_equal(2, @rc.zlexcount(KEY, 'a', 'b')) + # end + + def test_zrange + @rc.zadd(KEY, [[1, 'a'], [2, 'b'], [3, 'c']]) + assert_equal(['a', 'b'], @rc.zrange(KEY, 0, 1)) + options = {:withscores => true} + assert_equal([['a', 1], ['b', 2]], @rc.zrange(KEY, 0, 1, options)) + end + + def test_zrangebylex + @rc.zadd(KEY, [[0, 'a'], [0, 'b'], [0, 'c']]) + assert_equal(['a', 'b', 'c'], @rc.zrangebylex(KEY, '[a', '[c')) + end + + def test_zrevrangebylex + @rc.zadd(KEY, [[0, 'a'], [0, 'b'], [0, 'c']]) + assert_equal(['c', 'b', 'a'], @rc.zrevrangebylex(KEY, '[c', '[a')) + end + + def test_zrangebyscore + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(['b', 'a', 'c'], @rc.zrangebyscore(KEY, 1, 3)) + end + + def test_zrank + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(1, @rc.zrank(KEY, 'a')) + end + + def test_zrem + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(true, @rc.zrem(KEY, 'a')) + end + + # def test_zremrangebylex + # end + + def test_zremrangebyrank + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(1, @rc.zremrangebyrank(KEY, 0, 0)) + end + + def test_zrevrange + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(['c', 'a', 'b'], @rc.zrevrange(KEY, 0, 2)) + end + + def test_zrevrangebyscore + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(['c', 'a', 'b'], @rc.zrevrangebyscore(KEY, 3, 1)) + end + + def test_zrevrank + @rc.zadd(KEY, [[2, 'a'], [1, 'b'], [3, 'c']]) + assert_equal(0, @rc.zrevrank(KEY, 'c')) + end + + def test_zscore + @rc.zadd(KEY, 1, 'a') + assert_equal(1, @rc.zscore(KEY, 'a')) + end + + def test_zunionstore_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise(Exceptions::CrossSlotsError) { @rc.zunionstore(key1, [key1, key2]) } + end + + def test_zunionstore + key1, key2 = get_keys_in_same_slot + @rc.zadd(key1, [[1, 'a'], [2, 'b']]) + @rc.zadd(key2, [[1, 'a'], [2, 'b'], [3, 'c']]) + ret = @rc.zunionstore(key1, [key1, key2], :weights => [2, 3], :aggregate => "sum") + assert_equal(3, ret) + end + + def test_zscan + @rc.zadd(KEY, [[2, 'a'], [1, 'b']]) + assert_equal(["0", [["b", 1.0], ["a", 2.0]]], @rc.zscan(KEY, 0)) + end + +end diff --git a/tests/test_str_cmds.rb b/tests/test_str_cmds.rb new file mode 100644 index 0000000..51c2e35 --- /dev/null +++ b/tests/test_str_cmds.rb @@ -0,0 +1,159 @@ +require_relative '../lib/exceptions' +require_relative 'test_base' +require 'test/unit' + + +class TestStrCmds < TestBase + + def test_append + @rc.set(KEY, 'a') + assert_equal(2, @rc.append(KEY, 'b')) + end + + def test_bitcount + @rc.set(KEY, 'a') + assert_equal(3, @rc.bitcount(KEY)) + end + + def test_bitop_with_raise + key1, key2 = get_keys_in_diff_slot + assert_raise( Exceptions::CrossSlotsError) { @rc.bitop('and', 'key1', 'key2')} + end + + def test_bitop + key1, key2 = get_keys_in_same_slot + @rc.set(key1, 'foobar') + @rc.set(key2, 'abcdef') + assert_equal(6, @rc.bitop('and', key1, key1, key2)) + end + + def test_bitpos + @rc.set(KEY, "\xff\xf0\x00") + assert_equal(12, @rc.bitpos(KEY, 0)) + @rc.set(KEY, "\x00\xff\xf0") + assert_equal(8, @rc.bitpos(KEY, 1, 0)) + end + + def test_decr + @rc.set(KEY, 1) + assert_equal(0, @rc.decr(KEY)) + end + + def test_decrby + @rc.set(KEY, 3) + assert_equal(1, @rc.decrby(KEY, 2)) + end + + def test_get + @rc.set(KEY, 'a') + assert_equal('a', @rc.get(KEY)) + end + + def test_getbit + @rc.setbit(KEY, 7, 1) + assert_equal(1, @rc.getbit(KEY, 7)) + end + + def test_getrange + @rc.set(KEY, 'abcd') + assert_equal('bc', @rc.getrange(KEY, 1, 2)) + end + + def test_incr + @rc.set(KEY, 0) + assert_equal(1, @rc.incr(KEY)) + end + + def test_incrby + @rc.set(KEY, 0) + assert_equal(2, @rc.incrby(KEY, 2)) + end + + def test_incrbyfloat + @rc.set(KEY, 0.1) + assert_equal(0.3, @rc.incrbyfloat(KEY, 0.2)) + end + + def test_mget + key1, key2 = get_keys_in_same_slot + @rc.set(key1, 'a') + @rc.set(key2, 'b') + assert_equal(['a', 'b'], @rc.mget(key1, key2)) + end + + def test_mapped_mget + key1, key2 = get_keys_in_same_slot + @rc.set(key1, 'a') + @rc.set(key2, 'b') + ret = { + key1 => 'a', + key2 => 'b' + } + assert_equal(ret, @rc.mapped_mget(key1, key2)) + end + + def test_psetex + assert_equal(OK, @rc.setex(KEY, 100, 10)) + end + + def test_set + assert_equal(OK, @rc.set(KEY, 'a')) + assert_equal('a', @rc.get(KEY)) + end + + def test_mset + key1, key2 = get_keys_in_same_slot + @rc.mset(key1, 'a', key2, 'b') + assert_equal(['a', 'b'], @rc.mget(key1, key2)) + end + + def test_mapped_mset + key1, key2 = get_keys_in_same_slot + hash = { + key1 => 'a', + key2 => 'b' + } + assert_equal(OK, @rc.mapped_mset(hash)) + end + + def test_setbit + assert_equal(0, @rc.setbit(KEY, 7, 1)) + end + + def test_setex + assert_equal(OK, @rc.setex(KEY, 10, 10)) + end + + def test_setnx + assert_equal(true, @rc.setnx(KEY, 'hello')) + assert_equal(false, @rc.setnx(KEY, 'world')) + assert_equal('hello', @rc.get(KEY)) + end + + def test_msetnx + key1, key2 = get_keys_in_same_slot + assert_equal(true, @rc.msetnx(key1, 'a', key2, 'b')) + assert_equal(false, @rc.msetnx(key1, 'a', key2, 'b')) + end + + def test_mapped_msetnx + key1, key2 = get_keys_in_same_slot + hash = { + key1 => 'a', + key2 => 'b' + } + assert_equal(true, @rc.mapped_msetnx(hash)) + assert_equal(false, @rc.mapped_msetnx(hash)) + end + + def test_setrange + @rc.set(KEY, 'a') + assert_equal(3, @rc.setrange(KEY, 1, 'bc')) + end + + def test_strlen + @rc.set(KEY, 'ab') + assert_equal(2, @rc.strlen(KEY)) + end + +end