Skip to content

Commit

Permalink
Merge pull request #485 from Ladicek/improve-redis-cluster
Browse files Browse the repository at this point in the history
Improve Redis cluster
  • Loading branch information
vietj authored Dec 7, 2024
2 parents 0995a36 + a191be6 commit f0d55b7
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 28 deletions.
30 changes: 30 additions & 0 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,36 @@ When the cache is empty, the first attempt to acquire a connection will execute
The cache has a configurable TTL (time to live), which defaults to 1 second.
The cache is also cleared whenever any command executed by the client receives the `MOVED` redirection.

=== Cluster Utilities

The `RedisCluster` class contains a small number of methods useful in the Redis cluster.
To create an instance, call `create()` with either a `Redis` object, or a `RedisConnection` object.
If you call `create()` with a non-clustered `Redis` / `RedisConnection`, an exception is thrown.

The methods provided by `RedisCluster` are:

* `onAllNodes(Request)`: runs the request against all nodes in the cluster.
Returns a future that completes with a list of responses, one from each node, or failure when one of the operations fails.
Note that in case of a failure, there are no guarantees that the request was or wasn't executed successfully on other Redis cluster nodes.
No result order is guaranteed either.
* `onAllMasterNodes(Request)`: runs the request against all _master_ nodes in the cluster.
Returns a future that completes with a list of responses, one from each master node, or failure when one of the operations fails.
Note that in case of a failure, there are no guarantees that the request was or wasn't executed successfully on other Redis cluster master nodes.
No result order is guaranteed either.
* `groupByNodes(List<Request>)`: groups the requests into a `RequestGrouping`, which contains:
+
--
** _keyed_ requests: requests that include a key and it is therefore possible to determine to which master node they should be sent; all requests in each inner list in the `keyed` collection are guaranteed to be sent to the same _master_ node;
** _unkeyed_ requests: requests that do not include a key and it is therefore _not_ possible to determine to which master node they should be sent.
--
+
If any of the requests includes multiple keys that belong to different master nodes, the resulting future will fail.
+
If the cluster client was created with `RedisReplicas.SHARE` or `RedisReplicas.ALWAYS` and the commands are executed individually (using `RedisConnection.send()`, not `RedisConnection.batch()`), it is possible that the commands will be spread across different replicas of the same master node.
+
Note that this method is only reliable in case the Redis cluster is in a stable state.
In case of resharding, failover or in general any change of cluster topology, there are no guarantees on the validity of the result.

== Replication Mode

Working with replication is transparent to the client.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/redis/client/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.vertx.redis.client;

import io.vertx.codegen.annotations.VertxGen;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.redis.client.impl.CommandImpl;
import io.vertx.redis.client.impl.KeyLocator;
import io.vertx.redis.client.impl.keys.BeginSearchIndex;
Expand All @@ -29,7 +29,7 @@
* @author <a href="mailto:[email protected]">Paulo Lopes</a>
* @version redis_version:7.0.12
*/
@VertxGen
@DataObject
public interface Command {

/**
Expand Down
29 changes: 17 additions & 12 deletions src/main/java/io/vertx/redis/client/RedisCluster.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.redis.client;

import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.redis.client.impl.RedisClusterImpl;

Expand All @@ -14,6 +15,7 @@
* @see #onAllMasterNodes(Request)
* @see #groupByNodes(List)
*/
@VertxGen
public interface RedisCluster {
static RedisCluster create(Redis client) {
return new RedisClusterImpl(client);
Expand Down Expand Up @@ -50,18 +52,21 @@ static RedisCluster create(RedisConnection connection) {
Future<List<Response>> onAllMasterNodes(Request request);

/**
* Groups the {@code requests} such that all requests in each inner list in the result
* are guaranteed to be sent to the same Redis <em>master</em> node.
* Groups the {@code requests} into a {@link RequestGrouping}, which contains:
* <ul>
* <li><em>keyed</em> requests: requests that include a key and it is therefore possible
* to determine to which master node they should be sent; all requests in each inner list
* in the {@code keyed} collection are guaranteed to be sent to the same <em>master</em> node;</li>
* <li><em>unkeyed</em> requests: requests that do not include a key and it is therefore
* <em>not</em> possible to determine to which master node they should be sent.</li>
* </ul>
* If any of the {@code requests} includes multiple keys that belong to different master nodes,
* the resulting future will fail.
* <p>
* If the cluster client was not created with {@link RedisReplicas#NEVER} and
* the commands are executed individually (not using {@link RedisConnection#batch(List) batch()}),
* it is possible that the commands will be spread across different replicas
* of the same master node.
* <p>
* If any of the {@code requests} don't have keys and hence are targeted at a random
* node, all such requests shall be grouped into an extra single list for which
* the above guarantee does not apply. If any of the {@code requests} includes multiple
* keys that belong to different master nodes, the resulting future will fail.
* If the cluster client was created with {@link RedisReplicas#SHARE} or {@link RedisReplicas#ALWAYS}
* and the commands are executed individually (using {@link RedisConnection#send(Request) send()},
* not {@link RedisConnection#batch(List) batch()}), it is possible that the commands will be spread
* across different replicas of the same master node.
* <p>
* Note that this method is only reliable in case the Redis cluster is in a stable
* state. In case of resharding, failover or in general any change of cluster topology,
Expand All @@ -70,5 +75,5 @@ static RedisCluster create(RedisConnection connection) {
* @param requests the requests, must not be {@code null}
* @return the requests grouped by the cluster node assignment
*/
Future<List<List<Request>>> groupByNodes(List<Request> requests);
Future<RequestGrouping> groupByNodes(List<Request> requests);
}
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/redis/client/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.vertx.redis.client;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand All @@ -41,7 +41,7 @@
*
* @author Paulo Lopes
*/
@VertxGen
@DataObject
public interface Request {

/**
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/vertx/redis/client/RequestGrouping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.redis.client;

import io.vertx.codegen.annotations.DataObject;

import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* A result of {@link RedisCluster#groupByNodes(List)}.
*
* @see #getKeyed()
* @see #getUnkeyed()
*/
@DataObject
public class RequestGrouping {
private final Collection<List<Request>> keyed;
private final List<Request> unkeyed;

public RequestGrouping(Collection<List<Request>> keyed, List<Request> unkeyed) {
this.keyed = Objects.requireNonNull(keyed);
this.unkeyed = Objects.requireNonNull(unkeyed);
}

/**
* Returns a collection of request groups such that all requests in each group are
* guaranteed to be sent to the same master node.
* <p>
* Does not include any request that doesn't specify a key; use {@link #getUnkeyed()}
* to get those.
*/
public Collection<List<Request>> getKeyed() {
return keyed;
}

/**
* Returns a collection of requests that do not specify a key and would therefore
* be executed on random node.
*/
public List<Request> getUnkeyed() {
return unkeyed;
}
}
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/redis/client/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.vertx.redis.client;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.buffer.Buffer;

import java.math.BigInteger;
Expand Down Expand Up @@ -45,7 +45,7 @@
*
* @author Paulo Lopes
*/
@VertxGen
@DataObject
public interface Response extends Iterable<Response> {

/**
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import io.vertx.redis.client.RedisCluster;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.RequestGrouping;
import io.vertx.redis.client.Response;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -95,7 +97,7 @@ private void onAllNodes(String[] endpoints, int index, Request request, List<Res
}

@Override
public Future<List<List<Request>>> groupByNodes(List<Request> requests) {
public Future<RequestGrouping> groupByNodes(List<Request> requests) {
if (connection != null) {
return groupByNodes(requests, (RedisClusterConnection) connection);
} else /* client != null */ {
Expand All @@ -107,7 +109,7 @@ public Future<List<List<Request>>> groupByNodes(List<Request> requests) {
}
}

private Future<List<List<Request>>> groupByNodes(List<Request> requests, RedisClusterConnection conn) {
private Future<RequestGrouping> groupByNodes(List<Request> requests, RedisClusterConnection conn) {
return conn.sharedSlots.get()
.compose(slots -> {
Map<String, List<Request>> grouping = new HashMap<>();
Expand Down Expand Up @@ -143,11 +145,7 @@ private Future<List<List<Request>>> groupByNodes(List<Request> requests, RedisCl
}
}

List<List<Request>> result = new ArrayList<>(grouping.values());
if (ambiguous != null) {
result.add(ambiguous);
}
return Future.succeededFuture(result);
return Future.succeededFuture(new RequestGrouping(grouping.values(), ambiguous != null ? ambiguous : Collections.emptyList()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1150,12 +1150,12 @@ public void batchSameSlotGroupByMultipleSlotsCommands(TestContext should) {
cluster.groupByNodes(commands)
.onComplete(should.asyncAssertSuccess(groupedCommands -> {
List<Future<List<Response>>> futures = new ArrayList<>();
for (List<Request> commandGroup : groupedCommands) {
for (List<Request> commandGroup : groupedCommands.getKeyed()) {
futures.add(conn.batch(commandGroup));
}
Future.all(futures)
.onComplete(should.asyncAssertSuccess(responses -> {
should.assertEquals(groupedCommands.stream().map(List::size).reduce(0, Integer::sum),
should.assertEquals(groupedCommands.getKeyed().stream().map(List::size).reduce(0, Integer::sum),
responses.result().list().stream().map(item -> ((List<Request>) item).size()).reduce(0, Integer::sum));
test.complete();
}));
Expand All @@ -1181,7 +1181,7 @@ public void batchSameSlotsCommands(TestContext should) {

cluster.groupByNodes(commands)
.onComplete(should.asyncAssertSuccess(groupedCommands -> {
List<Request> commandGroup = groupedCommands.get(0);
List<Request> commandGroup = groupedCommands.getKeyed().iterator().next();

conn.batch(commandGroup)
.onComplete(should.asyncAssertSuccess(responses -> {
Expand Down

0 comments on commit f0d55b7

Please sign in to comment.