Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-20442: Accord: fix repairs retry logic after the migration to WaitStrategy #3984

Open
wants to merge 6 commits into
base: cep-15-accord
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/java/org/apache/cassandra/config/RetrySpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@
package org.apache.cassandra.config;

import java.util.Objects;
import java.util.Random;

import javax.annotation.Nullable;

import accord.utils.DefaultRandom;
import org.apache.cassandra.config.DurationSpec.LongMillisecondsBound;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.service.RetryStrategy;
import org.apache.cassandra.service.TimeoutStrategy.LatencySourceFactory;
import org.apache.cassandra.service.TimeoutStrategy;
import org.apache.cassandra.service.TimeoutStrategy.LatencySupplier.Constant;
import org.apache.cassandra.service.TimeoutStrategy.Wait.Modifying;
import org.apache.cassandra.service.WaitStrategy;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.service.RetryStrategy.randomizers;
import static org.apache.cassandra.service.TimeoutStrategy.modifiers;

public class RetrySpec
{
public static class MaxAttempt
Expand Down Expand Up @@ -83,7 +91,7 @@ public Partial()

public RetrySpec withDefaults(RetrySpec defaultValues)
{
MaxAttempt maxAttempts = nonNull(this.maxAttempts, defaultValues.getMaxAttempts(), DEFAULT_MAX_ATTEMPTS);
MaxAttempt maxAttempts = nonNull(this.maxAttempts, defaultValues.getMaxAttempts(), DEFAULT_MAX_RETRIES);
LongMillisecondsBound baseSleepTime = nonNull(this.baseSleepTime, defaultValues.getBaseSleepTime(), DEFAULT_BASE_SLEEP);
LongMillisecondsBound maxSleepTime = nonNull(this.maxSleepTime, defaultValues.getMaxSleepTime(), DEFAULT_MAX_SLEEP);
return new RetrySpec(maxAttempts, baseSleepTime, maxSleepTime);
Expand All @@ -99,7 +107,7 @@ private static <T> T nonNull(@Nullable T left, @Nullable T right, T defaultValue
}
}

public static final MaxAttempt DEFAULT_MAX_ATTEMPTS = MaxAttempt.DISABLED;
public static final MaxAttempt DEFAULT_MAX_RETRIES = MaxAttempt.DISABLED;
public static final LongMillisecondsBound DEFAULT_BASE_SLEEP = new LongMillisecondsBound("200ms");
public static final LongMillisecondsBound DEFAULT_MAX_SLEEP = new LongMillisecondsBound("1s");

Expand All @@ -108,7 +116,7 @@ private static <T> T nonNull(@Nullable T left, @Nullable T right, T defaultValue
* <p/>
* To disable, set to 0.
*/
public MaxAttempt maxAttempts = DEFAULT_MAX_ATTEMPTS; // 2 retries, 1 original request; so 3 total
public MaxAttempt maxAttempts = DEFAULT_MAX_RETRIES; // 2 retries, 1 original request; so 3 total
public LongMillisecondsBound baseSleepTime = DEFAULT_BASE_SLEEP;
public LongMillisecondsBound maxSleepTime = DEFAULT_MAX_SLEEP;

Expand Down Expand Up @@ -161,7 +169,7 @@ public static WaitStrategy toStrategy(SharedContext ctx, RetrySpec spec)
{
if (!spec.isEnabled())
return WaitStrategy.None.INSTANCE;
return RetryStrategy.parse(spec.baseSleepTime.toMilliseconds() + "ms * 2^attempts <= " + spec.maxSleepTime.toMilliseconds() + "ms,retries=" + (spec.maxAttempts.value - 1), LatencySourceFactory.none());
return doublingWaitStrategy(spec.maxAttempts.value, spec.baseSleepTime.to(MICROSECONDS), spec.maxSleepTime.to(MICROSECONDS), ctx.random().get());
}

@Override
Expand All @@ -173,4 +181,19 @@ public String toString()
", maxSleepTime=" + maxSleepTime +
'}';
}

// note: maxAttempts here excludes the initial attempt, so we are permitted this many retries
private static WaitStrategy doublingWaitStrategy(int maxRetries, long baseSleepTimeMicros, long maxSleepMicros, Random random)
{
return new RetryStrategy(randomizers(new DefaultRandom(random)).uniform(),
0,
doublingWait(baseSleepTimeMicros / 2),
doublingWait(baseSleepTimeMicros + (baseSleepTimeMicros / 2)),
maxSleepMicros, maxRetries);
}

private static TimeoutStrategy.Wait doublingWait(long baseSleepTimeMicros)
{
return new Modifying(new Constant(baseSleepTimeMicros), modifiers.doubleByRetries());
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/net/MessageDelivery.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void onResponse(Message<RSP> msg)
@Override
public void onFailure(InetAddressAndPort from, RequestFailure failure)
{
long retryDelay = backoff.computeWait(attempt + 1, NANOSECONDS);
long retryDelay = backoff.computeWait(attempt, NANOSECONDS);
// TODO (required): we already have a separate retry predicate, retries should not be taken into consideration when retrying
if (retryDelay < 0)
{
Expand Down
13 changes: 2 additions & 11 deletions src/java/org/apache/cassandra/service/RetryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import static java.util.concurrent.TimeUnit.*;
import static org.apache.cassandra.service.TimeoutStrategy.parseInMicros;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;

/**
* <p>A strategy for making retry timing decisions for operations.
Expand Down Expand Up @@ -203,7 +202,7 @@ public long wait(long min, long max, int attempts)
public final @Nonnull Wait max;
public final int maxAttempts;

protected RetryStrategy(WaitRandomizer waitRandomizer, long minMinMicros, Wait min, Wait max, long maxMaxMicros, int retries)
public RetryStrategy(WaitRandomizer waitRandomizer, long minMinMicros, Wait min, Wait max, long maxMaxMicros, int retries)
{
this.waitRandomizer = waitRandomizer;
this.minMinMicros = minMinMicros;
Expand All @@ -214,17 +213,9 @@ protected RetryStrategy(WaitRandomizer waitRandomizer, long minMinMicros, Wait m
Invariants.require(maxAttempts >= 1);
}

public long computeWaitUntil(int attempts)
{
long wait = computeWait(attempts, NANOSECONDS);
if (wait < 0)
return -1;
return nanoTime() + wait;
}

public long computeWait(int attempt, TimeUnit units)
{
if (attempt > maxAttempts)
if (attempt >= maxAttempts)
return -1;

long result;
Expand Down
41 changes: 27 additions & 14 deletions src/java/org/apache/cassandra/service/TimeoutStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static java.lang.Math.max;
import static java.lang.Math.pow;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.config.DatabaseDescriptor.getCasContentionTimeout;
import static org.apache.cassandra.config.DatabaseDescriptor.getReadRpcTimeout;
Expand Down Expand Up @@ -85,19 +84,39 @@ public class TimeoutStrategy implements WaitStrategy
static final Pattern WAIT = Pattern.compile(
"\\s*(?<const>0|[0-9]+[mu]?s)" +
"|\\s*((p(?<perc>[0-9]+)(\\((?<rw>r|w|rw|wr)\\))?)?|(?<constbase>0|[0-9]+[mu]?s))" +
"\\s*(([*]\\s*(?<mod>[0-9.]+))?\\s*(?<modkind>[*^]\\s*attempts)?)?\\s*");
"\\s*(([*]\\s*(?<mod>[0-9.]+))?\\s*(?<modkind>[*^]\\s*(<?count>attempts|retries))?)?\\s*");
static final Pattern TIME = Pattern.compile(
"0|[0-9]+[mu]?s");

// Factories can be useful for testing purposes, to supply custom implementations of selectors and modifiers.
final static LatencyModifierFactory modifiers = new LatencyModifierFactory(){};
public static final LatencyModifierFactory modifiers = new LatencyModifierFactory(){};

interface LatencyModifierFactory
public interface LatencyModifierFactory
{
default LatencyModifier identity() { return (l, a) -> l; }
default LatencyModifier multiply(double constant) { return (l, a) -> saturatedCast(l * constant); }
default LatencyModifier multiplyByAttempts(double multiply) { return (l, a) -> saturatedCast(l * multiply * a); }
default LatencyModifier multiplyByAttemptsExp(double base) { return (l, a) -> saturatedCast(l * pow(base, a)); }
default LatencyModifier multiplyByRetries(double multiply) { return (l, a) -> saturatedCast(l * multiply * (a - 1)); }
default LatencyModifier multiplyByAttemptsExp(double base)
{
if (base == 2) return doubleByAttempts();
return (l, a) -> saturatedCast(l * pow(base, a));
}
default LatencyModifier multiplyByRetriesExp(double base)
{
if (base == 2) return doubleByRetries();
return (l, a) -> saturatedCast(l * pow(base, a - 1));
}
default LatencyModifier doubleByAttempts() { return doubleByCount(1); }
default LatencyModifier doubleByRetries() { return doubleByCount(0); }

private static LatencyModifier doubleByCount(int shiftOffset)
{
return (l, a) -> {
long result = l << (a + shiftOffset);
return result <= 0 ? Long.MAX_VALUE : result;
};
}
}

public interface Wait
Expand All @@ -116,7 +135,7 @@ class Modifying implements Wait
final LatencySupplier supplier;
final LatencyModifier modifier;

Modifying(LatencySupplier supplier, LatencyModifier modifier)
public Modifying(LatencySupplier supplier, LatencyModifier modifier)
{
this.supplier = supplier;
this.modifier = modifier;
Expand Down Expand Up @@ -282,12 +301,6 @@ public long computeWait(int attempts, TimeUnit units)
return units.convert(wait, MICROSECONDS);
}

public long computeWaitUntil(int attempts)
{
long nanos = computeWait(attempts, NANOSECONDS);
return nanoTime() + nanos;
}

private static LatencySupplier parseLatencySupplier(Matcher m, LatencySourceFactory latenciesFactory)
{
String perc = m.group("perc");
Expand Down Expand Up @@ -315,9 +328,9 @@ else if (!modkind.startsWith("*"))
return modifiers.multiply(modifier);

if (modkind.startsWith("*"))
return modifiers.multiplyByAttempts(modifier);
return m.group("count").equals("attempts") ? modifiers.multiplyByAttempts(modifier) : modifiers.multiplyByRetries(modifier);
else if (modkind.startsWith("^"))
return modifiers.multiplyByAttemptsExp(modifier);
return m.group("count").equals("attempts") ? modifiers.multiplyByAttemptsExp(modifier) : modifiers.multiplyByRetriesExp(modifier);
else
throw new IllegalArgumentException("Unrecognised attempt modifier: " + modkind);
}
Expand Down
21 changes: 18 additions & 3 deletions src/java/org/apache/cassandra/service/WaitStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,26 @@

import java.util.concurrent.TimeUnit;

import org.apache.cassandra.utils.Clock;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

public interface WaitStrategy
{
default long computeWaitUntil(int attempts)
{
return computeWaitUntil(Clock.Global.clock(), attempts);
}

// a value of below 0 means give up
long computeWaitUntil(int attempts);
default long computeWaitUntil(Clock clock, int attempts)
{
long wait = computeWait(attempts, NANOSECONDS);
if (wait < 0)
return -1;
return clock.nanoTime() + wait;
}

// a value of below 0 means give up
long computeWait(int attempts, TimeUnit units);

Expand All @@ -38,10 +54,9 @@ public long computeWait(int attempts, TimeUnit units)
}

@Override
public long computeWaitUntil(int attempts)
public long computeWaitUntil(Clock clock, int attempts)
{
return -1;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ContentionStrategy(WaitRandomizer waitRandomizer, LegacyWait min, LegacyW

public long computeWait(int attempt, TimeUnit units)
{
if (attempt > maxAttempts)
if (attempt >= maxAttempts)
return -1;

long minWaitMicros = min.getMicros(attempt);
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/tcm/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.cassandra.service.WaitStrategy;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.Clock;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;

public interface Processor
{
Expand Down Expand Up @@ -66,9 +66,9 @@ private static Retry unsafeRetryIndefinitely()
return Retry.withNoTimeLimit(retryMeter, new WaitStrategy()
{
@Override
public long computeWaitUntil(int attempts)
public long computeWaitUntil(Clock clock, int attempts)
{
return nanoTime() + timeoutNanos;
return clock.nanoTime() + timeoutNanos;
}

@Override
Expand All @@ -94,7 +94,7 @@ default ClusterMetadata fetchLogAndWait()
{
return fetchLogAndWait(null); // wait for the highest possible epoch
}
;

default ClusterMetadata fetchLogAndWait(Epoch waitFor)
{
return fetchLogAndWait(waitFor,
Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/tcm/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.cassandra.service.RetryStrategy;
import org.apache.cassandra.service.TimeoutStrategy.LatencySourceFactory;
import org.apache.cassandra.service.WaitStrategy;
import org.apache.cassandra.utils.Clock;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
Expand Down Expand Up @@ -85,12 +86,12 @@ public boolean maybeSleep()
}

@Override
public long computeWaitUntil(int attempts)
public long computeWaitUntil(Clock clock, int attempts)
{
long wait = computeWaitInternal(attempts, TimeUnit.NANOSECONDS);
if (wait < 0)
return -1;
long now = nanoTime();
long now = clock.nanoTime();
if (now >= deadlineNanos)
return -1;
return Math.min(deadlineNanos, wait + now);
Expand Down