Skip to content

Commit

Permalink
[Atomic Slot Migration] Start CLUSTER MIGRATE command.
Browse files Browse the repository at this point in the history
Right now, this simply performs a full sync and gives up.

Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Oct 31, 2024
1 parent 64cfdf6 commit 52b0ce9
Show file tree
Hide file tree
Showing 17 changed files with 1,049 additions and 622 deletions.
14 changes: 14 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,20 @@ unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
}

unsigned int dropKeysInSlot(unsigned int hashslot, int async) {
unsigned int result = kvstoreSize(server.db[hashslot].keys);
if (async) {
emptyDbAsync(&server.db[hashslot]);
} else {
kvstoreEmpty(server.db[hashslot].keys, NULL);
kvstoreEmpty(server.db[hashslot].expires, NULL);
}
/* Because all keys of database are removed, reset average ttl. */
server.db[hashslot].avg_ttl = 0;
server.db[hashslot].expires_cursor = 0;
return result;
}

void clusterCommandHelp(client *c) {
const char *help[] = {
"COUNTKEYSINSLOT <slot>",
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int dropKeysInSlot(unsigned int hashslot, int async);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
Expand Down
217 changes: 212 additions & 5 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,7 @@ void clusterInit(void) {
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->slot_migrations = listCreate();

/* Initialize stats */
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
Expand Down Expand Up @@ -1409,7 +1410,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.primary_host == NULL && server.loading) return;
if (server.primary_replication_link == NULL && server.loading) return;

while (max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
Expand Down Expand Up @@ -4236,6 +4237,143 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterMsgSendBlockDecrRefCount(msgblock_light);
}

/* -----------------------------------------------------------------------------
* Slot Migration functions
* -------------------------------------------------------------------------- */

// sds slotMigrationResponseBufferPopLine(slotMigration *migration) {
// int line_marker = -1;
// for (int i = 1; i < sdslen(migration->response_buff); i++) {
// if (migration->response_buff[i-1] == '\r' && migration->response_buff[i] == '\n') {
// line_marker = i + 1;
// break;
// }
// }
// if (line_marker == -1) {
// return NULL;
// }
// sds result = sdsnewlen(migration->response_buff, line_marker);
// sdssubstr(migration->response_buff, line_marker, sdslen(migration->response_buff) - line_marker);
// return result;
// }

// void slotMigrationTryRead(slotMigration *migration) {
// connection *conn = migration->conn;
// size_t old_len = sdslen(migration->response_buff);
// int n = connRead(conn, migration->response_buff + old_len, sdsavail(migration->response_buff));
// if (n <= 0) {
// if (connGetState(conn) == CONN_STATE_CONNECTED) return;
// serverLog(LL_WARNING, "Failed to read from target node %.40s for slot %d: %s", migration->target_node, migration->slot, connGetLastError(conn));
// migration->state = SLOT_MIGRATION_ORIGIN_FAILED;
// }
// }

// void slotMigrationReadHandler(connection *conn) {
// slotMigration *migration = connGetPrivateData(conn);
// slotMigrationTryRead(migration);
// clusterProceedWithSlotMigration();
// }

// void slotMigrationTryWrite(slotMigration *migration) {
// connection *conn = migration->conn;
// int n = connWrite(conn, migration->output_buff + migration->output_buff_cursor, sdslen(migration->output_buff) - migration->output_buff_cursor);
// if (n <= 0) {
// if (connGetState(conn) == CONN_STATE_CONNECTED) return;
// serverLog(LL_WARNING, "Failed to write to target node %.40s for slot %d: %s", migration->target_node, migration->slot, connGetLastError(conn));
// migration->state = SLOT_MIGRATION_ORIGIN_FAILED;
// }
// migration->output_buff_cursor += n;
// if (sdslen(migration->output_buff) == migration->output_buff_cursor) {
// sdsclear(migration->output_buff);
// migration->output_buff_cursor = 0;
// }
// }

// void slotMigrationWriteHandler(connection *conn) {
// slotMigration *migration = connGetPrivateData(conn);
// if (sdslen(migration->output_buff)) {
// slotMigrationTryWrite(migration);
// }
// clusterProceedWithSlotMigration();
// }

// int slotMigrationStartExportSlot() {
// }

// void slotMigrationDestinationHandler(connection *conn) {
// slotMigration *migration = connGetPrivateData(conn);
// }

/* This is the main state machine for the slot migration workflow. This function will do as much
* work as possible synchronously, processing the current enqueued slot migrations, only returning
* once we are waiting on some IO. */
void clusterProceedWithSlotMigration(void) {
mstime_t now = mstime();
while (listLength(server.cluster->slot_migrations) != 0) {
listNode *curr_node = listFirst(server.cluster->slot_migrations);
slotMigration * curr_migration = (slotMigration *) curr_node->value;
if (curr_migration->state != SLOT_MIGRATION_QUEUED && curr_migration->end_time < now) {
serverLog(LL_WARNING,
"Timed out for slot migration from source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_FAILED;
}
switch(curr_migration->state) {
case SLOT_MIGRATION_QUEUED:
/* Start the migration */
serverLog(LL_NOTICE, "Starting sync from migration source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->end_time = mstime() + CLUSTER_SLOT_MIGRATION_TIMEOUT;
curr_migration->link = createReplicationLink(curr_migration->source_node->ip, getNodeDefaultReplicationPort(curr_migration->source_node), curr_migration->slot);
if (connectReplicationLink(curr_migration->link) == C_ERR) {
serverLog(LL_WARNING,
"Failed to begin sync from migration source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_FAILED;
continue;
}
curr_migration->state = SLOT_MIGRATION_SYNCING;
continue;
case SLOT_MIGRATION_SYNCING:
if (curr_migration->link->state == REPL_STATE_CONNECT) {
/* The first step should have started connecting previously, but we may need
* to restart the connection in case of a failure */
// TODO(murphyjacob4) we should cap the number of retries
serverLog(LL_WARNING, "Retrying sync from migration source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
if (connectReplicationLink(curr_migration->link) == C_ERR) {
serverLog(LL_WARNING,
"Failed to begin sync for migration source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_FAILED;
continue;
}
continue;
}
if (curr_migration->link->state == REPL_STATE_NONE) {
serverLog(LL_WARNING, "Sync failed from migration node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_FAILED;
continue;
}
if (curr_migration->link->state == REPL_STATE_CONNECTED) {
curr_migration->state = SLOT_MIGRATION_GATHERING_VOTES;
continue;
}
/* If we are in another state, nothing to do right now. */
return;
case SLOT_MIGRATION_GATHERING_VOTES:
serverLog(LL_WARNING, "Failover at the slot level is not implemented");
curr_migration->state = SLOT_MIGRATION_FAILED;
continue;
case SLOT_MIGRATION_FAILED:
/* Delete the migration from the queue and proceed to the next migration */
listDelNode(server.cluster->slot_migrations, curr_node);
freeReplicationLink(curr_migration->link);
zfree(curr_migration);
continue;
}
}
}

void clusterProceedWithSlotMigrationDestination(void) {

}

/* -----------------------------------------------------------------------------
* REPLICA node specific functions
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -4547,8 +4685,8 @@ void clusterHandleReplicaFailover(void) {

/* Set data_age to the number of milliseconds we are disconnected from
* the primary. */
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.primary->last_interaction) * 1000;
if (server.primary_replication_link && server.primary_replication_link->state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.primary_replication_link->client->last_interaction) * 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
Expand Down Expand Up @@ -5078,7 +5216,7 @@ void clusterCron(void) {
/* If we are a replica node but the replication is still turned off,
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
if (nodeIsReplica(myself) && server.primary_replication_link == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0);
}

Expand All @@ -5099,6 +5237,8 @@ void clusterCron(void) {
}

if (update_state || server.cluster->state == CLUSTER_FAIL) clusterUpdateState();

clusterProceedWithSlotMigration();
}

/* This function is called before the event handler returns to sleep for
Expand Down Expand Up @@ -6231,7 +6371,7 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
}

/* If 'myself' is a replica, 'c' must be the primary client. */
serverAssert(!nodeIsReplica(myself) || c == server.primary);
serverAssert(!nodeIsReplica(myself) || (server.primary_replication_link && server.primary_replication_link->client && c == server.primary_replication_link->client));

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

Expand Down Expand Up @@ -6796,6 +6936,71 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "migrate")) {
/* CLUSTER MIGRATE SLOTSRANGE <start> <end> [<start> <end>] */
if (nodeIsReplica(myself)) {
addReplyError(c, "Only primaries can migrate slots");
return 1;
}
if (c->argc < 5 || strcasecmp(c->argv[2]->ptr, "slotsrange")) {
addReplyError(c, "Migrate command requires at least one slot range");
return 1;
}
unsigned char requested_slots[CLUSTER_SLOTS/8];
memset(requested_slots, 0, sizeof(requested_slots));
int i;
for (i = 3; i + 1 < c->argc; i+=2) {
if (i > 3 && getLongLongFromObject(c->argv[i], NULL) != C_OK) {
/* If we find a non-integer in the args and we have already
* parsed >=1 slot range, we assume it is the next token. */
break;
}
int start = getSlotOrReply(c, c->argv[i]);
if (start < 0) {
return 1;
}
int end = getSlotOrReply(c, c->argv[i + 1]);
if (end < 0) {
return 1;
}
if (end < start) {
addReplyErrorFormat(c, "Invalid SLOTSRANGE, start slot %d is greater than end slot %d", start, end);
return 1;
}
for (int j = start; j <= end; j++) {
if (bitmapTestBit(requested_slots, j)) {
addReplyError(c, "Invalid SLOTSRANGE, slot ranges overlap");
return 1;
}
clusterNode * curr_owner = server.cluster->slots[j];
if (curr_owner == myself) {
addReplyErrorFormat(c, "I'm already the owner of hash slot %u", j);
return 1;
}
if (nodeFailed(curr_owner)) {
addReplyErrorFormat(c, "Primary is currently failing for slot %u. Please try again once there is a healthy primary", j);
return 1;
}
bitmapSetBit(requested_slots, j);
}
}

for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (bitmapTestBit(requested_slots, i)) {
slotMigration *to_enqueue = (slotMigration *) zmalloc(sizeof(slotMigration));
to_enqueue->source_node = server.cluster->slots[i];;
to_enqueue->slot = i;
to_enqueue->state = SLOT_MIGRATION_QUEUED;
to_enqueue->end_time = 0; /* Will be set once started. */
// to_enqueue->output_buff = sdsempty();
// to_enqueue->output_buff_cursor = 0;
// to_enqueue->response_buff = sdsempty();
listAddNodeTail(server.cluster->slot_migrations, to_enqueue);
}
}

clusterProceedWithSlotMigration();
addReply(c, shared.ok);
} else {
return 0;
}
Expand Down Expand Up @@ -6838,6 +7043,8 @@ const char **clusterCommandExtendedHelp(void) {
"LINKS",
" Return information about all network links between this node and its peers.",
" Output format is an array where each array element is a map containing attributes of a link",
"MIGRATE SLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] SHARD <shard-id>",
" Initiate server driven slot migration of all slot ranges to the designated shard.",
NULL};

return help;
Expand Down
22 changes: 22 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
#define CLUSTER_MF_PAUSE_MULT 2 /* Primary pause manual failover mult. */
#define CLUSTER_REPLICA_MIGRATION_DELAY 5000 /* Delay for replica migration. */
#define CLUSTER_SLOT_MIGRATION_TIMEOUT 30000 /* Milliseconds to do a slot migration. */

/* Reasons why a replica is not able to failover. */
#define CLUSTER_CANT_FAILOVER_NONE 0
Expand Down Expand Up @@ -364,6 +365,26 @@ typedef struct slotStat {
uint64_t network_bytes_out;
} slotStat;

typedef enum slotMigrationState {
SLOT_MIGRATION_QUEUED, /* Queued behind some other slot migration. */
SLOT_MIGRATION_SYNCING, /* Syncing contents from current owner. */
SLOT_MIGRATION_GATHERING_VOTES, /* Gathering votes necessary for slot-level takeover. */
SLOT_MIGRATION_FAILED,
} slotMigrationState;

typedef struct slotMigration {
short slot;
slotMigrationState state;
clusterNode *source_node;
mstime_t end_time; /* Slot migration time limit (ms unixtime).
If not yet in progress (e.g. queued), will be zero. */
// connection *conn;
// sds output_buff;
// int output_buff_cursor;
// sds response_buff;
replicationLink *link;
} slotMigration;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -413,6 +434,7 @@ struct clusterState {
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
slotStat slot_stats[CLUSTER_SLOTS];
list *slot_migrations; /* Queue of ongoing slot migrations. */
};

#endif // CLUSTER_LEGACY_H
20 changes: 20 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,25 @@ struct COMMAND_ARG CLUSTER_MEET_Args[] = {
{MAKE_ARG("cluster-bus-port",ARG_TYPE_INTEGER,-1,NULL,NULL,"4.0.0",CMD_ARG_OPTIONAL,0,NULL)},
};

/********** CLUSTER MIGRATE ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER MIGRATE history */
#define CLUSTER_MIGRATE_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER MIGRATE tips */
const char *CLUSTER_MIGRATE_Tips[] = {
"nondeterministic_output",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER MIGRATE key specs */
#define CLUSTER_MIGRATE_Keyspecs NULL
#endif

/********** CLUSTER MYID ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1020,6 +1039,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("keyslot","Returns the hash slot for a key.","O(N) where N is the number of bytes in the key","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_KEYSLOT_History,0,CLUSTER_KEYSLOT_Tips,0,clusterCommand,3,CMD_STALE,0,CLUSTER_KEYSLOT_Keyspecs,0,NULL,1),.args=CLUSTER_KEYSLOT_Args},
{MAKE_CMD("links","Returns a list of all TCP links to and from peer nodes.","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,0,CLUSTER_LINKS_Tips,1,clusterCommand,2,CMD_STALE,0,CLUSTER_LINKS_Keyspecs,0,NULL,0)},
{MAKE_CMD("meet","Forces a node to handshake with another node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,1,CLUSTER_MEET_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_MEET_Keyspecs,0,NULL,3),.args=CLUSTER_MEET_Args},
{MAKE_CMD("migrate","Initiates server driven hash slot migration from this shard to a designated shard.","O(N) where N is the total number of hash slot arguments","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MIGRATE_History,0,CLUSTER_MIGRATE_Tips,1,clusterCommand,-2,CMD_ADMIN|CMD_STALE,0,CLUSTER_MIGRATE_Keyspecs,0,NULL,0)},
{MAKE_CMD("myid","Returns the ID of a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,0,CLUSTER_MYID_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYID_Keyspecs,0,NULL,0)},
{MAKE_CMD("myshardid","Returns the shard ID of a node.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,0,CLUSTER_MYSHARDID_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYSHARDID_Keyspecs,0,NULL,0)},
{MAKE_CMD("nodes","Returns the cluster configuration for a node.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,0,CLUSTER_NODES_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_NODES_Keyspecs,0,NULL,0)},
Expand Down
Loading

0 comments on commit 52b0ce9

Please sign in to comment.