From 636e53321d3c44d40078e911858bd47578933bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szymon=20Mi=C4=99=C5=BCa=C5=82?= Date: Tue, 11 Mar 2025 16:11:01 +0100 Subject: [PATCH] Fix replacing node stuck in hibernation state When a node is replaced, it announces itself as hibernated (one of the silent shutdown states). If the replacement fails, other nodes continue to see the replacing node in this state. As a result, the replacing node does not receive gossip messages from the seed upon subsequent startup, leading to an exception. This patch adds an explicit shutdown announcement via gossip to let other nodes know that the node was explicitly shutdown - as it was due to the exception. This allows other nodes (seeds in particular) to contact the replacing node at its next startup, allowing it to retry the replacement. --- .../org/apache/cassandra/gms/Gossiper.java | 27 +++-- .../cassandra/service/StorageService.java | 4 + .../cassandra/distributed/impl/Instance.java | 1 + .../hostreplacement/HostReplacementTest.java | 102 ++++++++++++++++++ 4 files changed, 126 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 9a7a9935b8e7..db891966fb6e 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -1391,11 +1391,13 @@ private void silentlyMarkDead(InetAddressAndPort addr, EndpointState localState) /** * This method is called whenever there is a "big" change in ep state (a generation change for a known node). + * It is public as the state change simulation is needed in testing, otherwise should not be used directly. * * @param ep endpoint * @param epState EndpointState for the endpoint */ - private void handleMajorStateChange(InetAddressAndPort ep, EndpointState epState) + @VisibleForTesting + public void handleMajorStateChange(InetAddressAndPort ep, EndpointState epState) { checkProperThreadForStateMutation(); EndpointState localEpState = endpointStateMap.get(ep); @@ -2063,13 +2065,7 @@ public void stop() EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); if (mystate != null && !isSilentShutdownState(mystate) && StorageService.instance.isJoined()) { - logger.info("Announcing shutdown"); - addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true)); - addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); - Message message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload); - for (InetAddressAndPort ep : liveEndpoints) - MessagingService.instance().send(message, ep); - Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS); + announceShutdown(); } else logger.warn("No local state, state is in silent shutdown, or node hasn't joined, not announcing shutdown"); @@ -2077,6 +2073,21 @@ public void stop() scheduledGossipTask.cancel(false); } + /** + * This method sends the node shutdown status to all live endpoints. + * It does not close the gossiper itself. + */ + public void announceShutdown() + { + logger.info("Announcing shutdown"); + addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true)); + addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); + Message message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload); + for (InetAddressAndPort ep : liveEndpoints) + MessagingService.instance().send(message, ep); + Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS); + } + public boolean isEnabled() { ScheduledFuture scheduledGossipTask = this.scheduledGossipTask; diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d3dab125e4d9..91d923a0dc87 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1897,7 +1897,11 @@ public boolean bootstrap(final Collection tokens, long bootstrapTimeoutMi SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress()); } if (!Gossiper.instance.seenAnySeed()) + { + logger.info("Announcing shutdown to get out of the hibernation deadlock"); + Gossiper.instance.announceShutdown(); throw new IllegalStateException("Unable to contact any seeds: " + Gossiper.instance.getSeeds()); + } if (Boolean.getBoolean("cassandra.reset_bootstrap_progress")) { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index a8c948c25d73..6c2a0221a5ea 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -632,6 +632,7 @@ public void startup(ICluster cluster) } catch (Throwable t) { + startedAt.set(0); if (t instanceof RuntimeException) throw (RuntimeException) t; throw new RuntimeException(t); diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java index 3de0bf51d5e8..f3e3beb2d847 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java @@ -19,8 +19,10 @@ package org.apache.cassandra.distributed.test.hostreplacement; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; +import java.util.UUID; import org.junit.Test; import org.slf4j.Logger; @@ -32,10 +34,21 @@ import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.InstanceConfig; import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; import org.assertj.core.api.Assertions; import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; @@ -44,9 +57,13 @@ import static org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs; import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy; import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getDirectories; import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens; import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart; import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.gms.Gossiper.Props.DISABLE_THREAD_VALIDATION; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertFalse; public class HostReplacementTest extends TestBaseImpl { @@ -205,6 +222,91 @@ public void seedGoesDownBeforeDownHost() throws IOException } } + /** + * Make sure that a node stuck in hibernate state due to failed replacement can retry the replacement procedure and succeed. + */ + @Test + public void retryingFailedReplaceWithNodeInHibernateState() throws IOException + { + try (WithProperties properties = new WithProperties()) + { + properties.setProperty(DISABLE_THREAD_VALIDATION, "true"); + + // given a two node cluster with one need + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2); + try (Cluster cluster = Cluster.build(2) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL) + .set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, true)) + .withTokenSupplier(node -> even.token(node == 3 ? 2 : node)) + .start() ) + { + IInvokableInstance seed = cluster.get(1); + IInvokableInstance nodeToReplace = cluster.get(2); + + setupCluster(cluster); + SimpleQueryResult expectedState = nodeToReplace.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); + + // when + // stop the node to replace + stopUnchecked(nodeToReplace); + // wipe the node to replace + getDirectories(nodeToReplace).forEach(FileUtils::deleteRecursive); + + String toReplaceAddress = nodeToReplace.config().broadcastAddress().getAddress().getHostAddress(); + // set hibernate status for the node to replace on seed + seed.runOnInstance(putInHibernation(toReplaceAddress)); + + // we need to fake a new host id + ((InstanceConfig) nodeToReplace.config()).setHostId(UUID.randomUUID()); + // enable autoboostrap + nodeToReplace.config().set("auto_bootstrap", true); + + // first replacement will fail as the node was announced as hibernated and no-one can contact it as startup + assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> { + ClusterUtils.start(nodeToReplace, props -> { + // set the replacement address + props.setProperty("cassandra.replace_address", toReplaceAddress); + }); + }).withMessageContaining("Unable to contact any seeds"); + + // then + // retrying replacement will succeed as the node announced itself as shutdown before killing itself + ClusterUtils.start(nodeToReplace, props -> { + // set the replacement address + props.setProperty("cassandra.replace_address", toReplaceAddress); + }); + assertFalse("replaces node should be up", nodeToReplace.isShutdown()); + + // the data after replacement should be consistent + awaitRingJoin(seed, nodeToReplace); + awaitRingJoin(nodeToReplace, seed); + + validateRows(seed.coordinator(), expectedState); + validateRows(nodeToReplace.coordinator(), expectedState); + } + } + } + + private static IIsolatedExecutor.SerializableRunnable putInHibernation(String address) + { + return () -> { + InetAddressAndPort endpoint; + try + { + endpoint = InetAddressAndPort.getByName(address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + VersionedValue newStatus = StorageService.instance.valueFactory.hibernate(true); + epState.addApplicationState(ApplicationState.STATUS, newStatus); + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, newStatus); + Gossiper.instance.handleMajorStateChange(endpoint, epState); + }; + } + static void setupCluster(Cluster cluster) { fixDistributedSchemas(cluster);