diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 2a73824317..70ca2ab252 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -90,6 +90,7 @@ slotMigration *clusterGetCurrentSlotMigration(void); void clusterSendMigrateSlotStart(clusterNode *node, int slot_num); void clusterRequestMigrateSlot(int slot_num); void clusterSendMigrateSlotAck(clusterNode *node, int slot_num); +mstime_t clusterGetVoteTimeout(); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, @@ -4314,21 +4315,22 @@ slotMigration *clusterGetCurrentSlotMigration(void) { return (slotMigration *) listFirst(server.cluster->slot_migrations)->value; } -/* This is the main state machine for the slot migration workflow. This function will do as much - * work as possible synchronously, processing the current enqueued slot migrations, only returning - * once we are waiting on some IO. */ +/* This is the main state machine for the slot migration workflow. Slot + * migration is driven by the new owner of the slot. This function will do as + * much work as possible synchronously, processing the enqueued slot migrations + * and only returning once we are waiting on some IO. */ void clusterProceedWithSlotMigration(void) { - mstime_t now = mstime(); server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_SLOTMIGRATION; + while (clusterGetCurrentSlotMigration() != NULL) { listNode *curr_node = listFirst(server.cluster->slot_migrations); slotMigration *curr_migration = (slotMigration *) curr_node->value; - if (curr_migration->state != SLOT_MIGRATION_QUEUED && curr_migration->end_time < now) { + if (curr_migration->state != SLOT_MIGRATION_QUEUED && curr_migration->end_time < mstime()) { serverLog(LL_WARNING, "Timed out for slot migration from source node %.40s for slot %d", curr_migration->source_node->name, curr_migration->slot); curr_migration->state = SLOT_MIGRATION_FAILED; } - if (curr_migration->state >= SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < now) { + if (curr_migration->state >= SLOT_MIGRATION_PAUSE_OWNER && curr_migration->pause_end < mstime()) { /* If the owner ever unpauses, we have to move back in the state machine and retry. */ serverLog(LL_WARNING, "Timed out waiting to sync to slot owner's paused time. Going to reinitiate pause and retry."); curr_migration->state = SLOT_MIGRATION_PAUSE_OWNER; @@ -4388,7 +4390,7 @@ void clusterProceedWithSlotMigration(void) { /* Need to wait for the sync to progress further */ return; case SLOT_MIGRATION_STARTING_VOTE: - if (curr_migration->vote_retry_time < now) { + if (curr_migration->vote_retry_time < mstime()) { /* Compute the failover timeout (the max time we have to send votes * and wait for replies), and the failover retry time (the time to wait * before trying to get voted again). @@ -4397,18 +4399,16 @@ void clusterProceedWithSlotMigration(void) { * Retry is two times the Timeout. */ // TODO(murphyjacob4) lets make this a function and share with repl - mstime_t timeout = server.cluster_node_timeout * 2; - if (timeout < CLUSTER_OPERATION_TIMEOUT) timeout = CLUSTER_OPERATION_TIMEOUT; - curr_migration->vote_retry_time = now + timeout * 2; - curr_migration->vote_end_time = now + timeout; + mstime_t timeout = clusterGetVoteTimeout(); + curr_migration->vote_retry_time = mstime() + timeout * 2; + curr_migration->vote_end_time = mstime() + timeout; server.cluster->currentEpoch++; curr_migration->vote_epoch = server.cluster->currentEpoch; curr_migration->auth_count = 0; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); serverLog(LL_NOTICE, "Starting slot migration vote for slot %d on epoch %llu", curr_migration->slot, (unsigned long long) server.cluster->currentEpoch); clusterRequestMigrateSlot(curr_migration->slot); curr_migration->state = SLOT_MIGRATION_GATHERING_VOTES; - // TODO(murphyjacob4) does this actually do anything? Should we add something to the nodes.conf - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); continue; } @@ -4416,7 +4416,7 @@ void clusterProceedWithSlotMigration(void) { return; case SLOT_MIGRATION_GATHERING_VOTES: { int needed_quorum = (server.cluster->size / 2) + 1; - if (curr_migration->vote_end_time < now) { + if (curr_migration->vote_end_time < mstime()) { curr_migration->state = SLOT_MIGRATION_STARTING_VOTE; continue; } @@ -4777,6 +4777,18 @@ void clusterFailoverReplaceYourPrimary(void) { resetManualFailover(); } +/* Compute the failover timeout (the max time we have to send votes + * and wait for replies). + * + * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds. + * Retry is two times the Timeout. + */ +mstime_t clusterGetVoteTimeout() { + mstime_t timeout = server.cluster_node_timeout * 2; + if (timeout < CLUSTER_OPERATION_TIMEOUT) timeout = CLUSTER_OPERATION_TIMEOUT; + return timeout; +} + /* This function is called if we are a replica node and our primary serving * a non-zero amount of hash slots is in FAIL state. * @@ -4794,15 +4806,8 @@ void clusterHandleReplicaFailover(void) { server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER; - /* Compute the failover timeout (the max time we have to send votes - * and wait for replies), and the failover retry time (the time to wait - * before trying to get voted again). - * - * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds. - * Retry is two times the Timeout. - */ - auth_timeout = server.cluster_node_timeout * 2; - if (auth_timeout < CLUSTER_OPERATION_TIMEOUT) auth_timeout = CLUSTER_OPERATION_TIMEOUT; + + auth_timeout = clusterGetVoteTimeout(); auth_retry_time = auth_timeout * 2; /* Pre conditions to run the function, that must be met both in case