Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Nov 4, 2024
1 parent b8e451d commit 421b8f1
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ slotMigration *clusterGetCurrentSlotMigration(void);
void clusterSendMigrateSlotStart(clusterNode *node, int slot_num);
void clusterRequestMigrateSlot(int slot_num);
void clusterSendMigrateSlotAck(clusterNode *node, int slot_num);
mstime_t clusterGetVoteTimeout();
void moduleCallClusterReceivers(const char *sender_id,
uint64_t module_id,
uint8_t type,
Expand Down Expand Up @@ -4314,21 +4315,22 @@ slotMigration *clusterGetCurrentSlotMigration(void) {
return (slotMigration *) listFirst(server.cluster->slot_migrations)->value;
}

/* This is the main state machine for the slot migration workflow. This function will do as much
* work as possible synchronously, processing the current enqueued slot migrations, only returning
* once we are waiting on some IO. */
/* This is the main state machine for the slot migration workflow. Slot
* migration is driven by the new owner of the slot. This function will do as
* much work as possible synchronously, processing the enqueued slot migrations
* and only returning once we are waiting on some IO. */
void clusterProceedWithSlotMigration(void) {
mstime_t now = mstime();
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_SLOTMIGRATION;

while (clusterGetCurrentSlotMigration() != NULL) {
listNode *curr_node = listFirst(server.cluster->slot_migrations);
slotMigration *curr_migration = (slotMigration *) curr_node->value;
if (curr_migration->state != SLOT_MIGRATION_QUEUED && curr_migration->end_time < now) {
if (curr_migration->state != SLOT_MIGRATION_QUEUED && curr_migration->end_time < mstime()) {
serverLog(LL_WARNING,
"Timed out for slot migration from source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_FAILED;
}
if (curr_migration->state >= SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < now) {
if (curr_migration->state >= SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < mstime()) {
/* If the owner ever unpauses, we have to move back in the state machine and retry. */
serverLog(LL_WARNING, "Timed out waiting to sync to slot owner's paused time. Going to reinitiate pause and retry.");
curr_migration->state = SLOT_MIGRATION_PAUSE_OWNER;
Expand Down Expand Up @@ -4388,7 +4390,7 @@ void clusterProceedWithSlotMigration(void) {
/* Need to wait for the sync to progress further */
return;
case SLOT_MIGRATION_STARTING_VOTE:
if (curr_migration->vote_retry_time < now) {
if (curr_migration->vote_retry_time < mstime()) {
/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
Expand All @@ -4397,26 +4399,24 @@ void clusterProceedWithSlotMigration(void) {
* Retry is two times the Timeout.
*/
// TODO(murphyjacob4) lets make this a function and share with repl
mstime_t timeout = server.cluster_node_timeout * 2;
if (timeout < CLUSTER_OPERATION_TIMEOUT) timeout = CLUSTER_OPERATION_TIMEOUT;
curr_migration->vote_retry_time = now + timeout * 2;
curr_migration->vote_end_time = now + timeout;
mstime_t timeout = clusterGetVoteTimeout();
curr_migration->vote_retry_time = mstime() + timeout * 2;
curr_migration->vote_end_time = mstime() + timeout;
server.cluster->currentEpoch++;
curr_migration->vote_epoch = server.cluster->currentEpoch;
curr_migration->auth_count = 0;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
serverLog(LL_NOTICE, "Starting slot migration vote for slot %d on epoch %llu", curr_migration->slot, (unsigned long long) server.cluster->currentEpoch);
clusterRequestMigrateSlot(curr_migration->slot);
curr_migration->state = SLOT_MIGRATION_GATHERING_VOTES;
// TODO(murphyjacob4) does this actually do anything? Should we add something to the nodes.conf
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
continue;
}

/* Must wait to start another vote. */
return;
case SLOT_MIGRATION_GATHERING_VOTES: {
int needed_quorum = (server.cluster->size / 2) + 1;
if (curr_migration->vote_end_time < now) {
if (curr_migration->vote_end_time < mstime()) {
curr_migration->state = SLOT_MIGRATION_STARTING_VOTE;
continue;
}
Expand Down Expand Up @@ -4777,6 +4777,18 @@ void clusterFailoverReplaceYourPrimary(void) {
resetManualFailover();
}

/* Compute the failover timeout (the max time we have to send votes
* and wait for replies).
*
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
mstime_t clusterGetVoteTimeout() {
mstime_t timeout = server.cluster_node_timeout * 2;
if (timeout < CLUSTER_OPERATION_TIMEOUT) timeout = CLUSTER_OPERATION_TIMEOUT;
return timeout;
}

/* This function is called if we are a replica node and our primary serving
* a non-zero amount of hash slots is in FAIL state.
*
Expand All @@ -4794,15 +4806,8 @@ void clusterHandleReplicaFailover(void) {

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout * 2;
if (auth_timeout < CLUSTER_OPERATION_TIMEOUT) auth_timeout = CLUSTER_OPERATION_TIMEOUT;

auth_timeout = clusterGetVoteTimeout();
auth_retry_time = auth_timeout * 2;

/* Pre conditions to run the function, that must be met both in case
Expand Down

0 comments on commit 421b8f1

Please sign in to comment.