Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19580: Fix replacing node stuck in hibernation state (4.0) #3972

Open
wants to merge 1 commit into
base: cassandra-4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -1391,11 +1391,13 @@ private void silentlyMarkDead(InetAddressAndPort addr, EndpointState localState)

/**
* This method is called whenever there is a "big" change in ep state (a generation change for a known node).
* It is public as the state change simulation is needed in testing, otherwise should not be used directly.
*
* @param ep endpoint
* @param epState EndpointState for the endpoint
*/
private void handleMajorStateChange(InetAddressAndPort ep, EndpointState epState)
@VisibleForTesting
public void handleMajorStateChange(InetAddressAndPort ep, EndpointState epState)
{
checkProperThreadForStateMutation();
EndpointState localEpState = endpointStateMap.get(ep);
Expand Down Expand Up @@ -2063,20 +2065,29 @@ public void stop()
EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
if (mystate != null && !isSilentShutdownState(mystate) && StorageService.instance.isJoined())
{
logger.info("Announcing shutdown");
addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true));
addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
Message message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload);
for (InetAddressAndPort ep : liveEndpoints)
MessagingService.instance().send(message, ep);
Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS);
announceShutdown();
}
else
logger.warn("No local state, state is in silent shutdown, or node hasn't joined, not announcing shutdown");
if (scheduledGossipTask != null)
scheduledGossipTask.cancel(false);
}

/**
* This method sends the node shutdown status to all live endpoints.
* It does not close the gossiper itself.
*/
public void announceShutdown()
{
logger.info("Announcing shutdown");
addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true));
addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
Message message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload);
for (InetAddressAndPort ep : liveEndpoints)
MessagingService.instance().send(message, ep);
Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS);
}

public boolean isEnabled()
{
ScheduledFuture<?> scheduledGossipTask = this.scheduledGossipTask;
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,11 @@ public boolean bootstrap(final Collection<Token> tokens, long bootstrapTimeoutMi
SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
}
if (!Gossiper.instance.seenAnySeed())
{
logger.info("Announcing shutdown to get out of the hibernation deadlock");
Gossiper.instance.announceShutdown();
throw new IllegalStateException("Unable to contact any seeds: " + Gossiper.instance.getSeeds());
}

if (Boolean.getBoolean("cassandra.reset_bootstrap_progress"))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ public void startup(ICluster cluster)
}
catch (Throwable t)
{
startedAt.set(0);
if (t instanceof RuntimeException)
throw (RuntimeException) t;
throw new RuntimeException(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.cassandra.distributed.test.hostreplacement;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -32,10 +34,21 @@
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.StorageService;
import org.assertj.core.api.Assertions;

import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
Expand All @@ -44,9 +57,13 @@
import static org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
import static org.apache.cassandra.distributed.shared.ClusterUtils.getDirectories;
import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
import static org.apache.cassandra.gms.Gossiper.Props.DISABLE_THREAD_VALIDATION;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertFalse;

public class HostReplacementTest extends TestBaseImpl
{
Expand Down Expand Up @@ -205,6 +222,91 @@ public void seedGoesDownBeforeDownHost() throws IOException
}
}

/**
* Make sure that a node stuck in hibernate state due to failed replacement can retry the replacement procedure and succeed.
*/
@Test
public void retryingFailedReplaceWithNodeInHibernateState() throws IOException
{
try (WithProperties properties = new WithProperties())
{
properties.setProperty(DISABLE_THREAD_VALIDATION, "true");

// given a two node cluster with one need
TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
try (Cluster cluster = Cluster.build(2)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, true))
.withTokenSupplier(node -> even.token(node == 3 ? 2 : node))
.start() )
{
IInvokableInstance seed = cluster.get(1);
IInvokableInstance nodeToReplace = cluster.get(2);

setupCluster(cluster);
SimpleQueryResult expectedState = nodeToReplace.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);

// when
// stop the node to replace
stopUnchecked(nodeToReplace);
// wipe the node to replace
getDirectories(nodeToReplace).forEach(FileUtils::deleteRecursive);

String toReplaceAddress = nodeToReplace.config().broadcastAddress().getAddress().getHostAddress();
// set hibernate status for the node to replace on seed
seed.runOnInstance(putInHibernation(toReplaceAddress));

// we need to fake a new host id
((InstanceConfig) nodeToReplace.config()).setHostId(UUID.randomUUID());
// enable autoboostrap
nodeToReplace.config().set("auto_bootstrap", true);

// first replacement will fail as the node was announced as hibernated and no-one can contact it as startup
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
ClusterUtils.start(nodeToReplace, props -> {
// set the replacement address
props.setProperty("cassandra.replace_address", toReplaceAddress);
});
}).withMessageContaining("Unable to contact any seeds");

// then
// retrying replacement will succeed as the node announced itself as shutdown before killing itself
ClusterUtils.start(nodeToReplace, props -> {
// set the replacement address
props.setProperty("cassandra.replace_address", toReplaceAddress);
});
assertFalse("replaces node should be up", nodeToReplace.isShutdown());

// the data after replacement should be consistent
awaitRingJoin(seed, nodeToReplace);
awaitRingJoin(nodeToReplace, seed);

validateRows(seed.coordinator(), expectedState);
validateRows(nodeToReplace.coordinator(), expectedState);
}
}
}

private static IIsolatedExecutor.SerializableRunnable putInHibernation(String address)
{
return () -> {
InetAddressAndPort endpoint;
try
{
endpoint = InetAddressAndPort.getByName(address);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
VersionedValue newStatus = StorageService.instance.valueFactory.hibernate(true);
epState.addApplicationState(ApplicationState.STATUS, newStatus);
epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, newStatus);
Gossiper.instance.handleMajorStateChange(endpoint, epState);
};
}

static void setupCluster(Cluster cluster)
{
fixDistributedSchemas(cluster);
Expand Down