Skip to content

hcd-130 diag patch #1743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: hcd-1.1.1-rel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -102,7 +103,6 @@
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.repair.CassandraTableRepairManager;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.streaming.CassandraStreamManager;
import org.apache.cassandra.db.view.TableViews;
import org.apache.cassandra.dht.AbstractBounds;
Expand Down Expand Up @@ -140,7 +140,6 @@
import org.apache.cassandra.nodes.Nodes;
import org.apache.cassandra.repair.TableRepairManager;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.CompressionParams;
Expand Down Expand Up @@ -373,6 +372,8 @@ public boolean isInvalidAndShouldDropData()

private final RequestTracker requestTracker = RequestTracker.instance;

private final ReentrantLock longRunningSerializedOperationsLock = new ReentrantLock();

public static void shutdownPostFlushExecutor() throws InterruptedException
{
postFlushExecutor.shutdown();
Expand Down Expand Up @@ -2792,7 +2793,8 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
synchronized (this)
longRunningSerializedOperationsLock.lock();
try
{
logger.trace("Cancelling in-progress compactions for {}", metadata.name);
Iterable<ColumnFamilyStore> toInterruptFor = concatWith(interruptIndexes, interruptViews);
Expand All @@ -2804,14 +2806,9 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate);

// doublecheck that we finished, instead of timing out
for (ColumnFamilyStore cfs : toInterruptFor)
{
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
{
logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
return null;
}
}
if (!allCompactionsFinished(toInterruptFor, sstablesPredicate))
return null;

logger.trace("Compactions successfully cancelled");

// run our task
Expand All @@ -2825,6 +2822,26 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
}
}
}
finally
{
longRunningSerializedOperationsLock.unlock();
}
}

private boolean allCompactionsFinished( Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablesPredicate)
{
for (ColumnFamilyStore cfs : cfss)
{
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
{
logger.warn("Unable to cancel in-progress compactions for {}.{}. Perhaps there is an unusually " +
"large row in progress somewhere, or the system is simply overloaded.", metadata.keyspace, metadata.name);
logger.debug("In-flight compactions: {}", Arrays.toString(cfs.getTracker().getCompacting().toArray()));
return false;
}
}

return true;
}

private static CompactionManager.CompactionPauser pauseCompactionStrategies(Iterable<ColumnFamilyStore> toPause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void setSubmitted(CompactionStrategy strategy, UUID id, CompactionAggregate aggr
if (id == null || aggregate == null)
throw new IllegalArgumentException("arguments cannot be null");

logger.debug("Submitting background compaction {}", id);
logger.debug("Submitting background compaction {} for {}.{}", id, metadata.keyspace, metadata.name);
CompactionPick compaction = aggregate.getSelected();

CompactionPick prev = compactions.put(id, compaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.NonThrowingCloseable;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
Expand Down Expand Up @@ -2421,6 +2422,10 @@ public void waitForCessation(Iterable<ColumnFamilyStore> cfss, Predicate<SSTable

while (System.nanoTime() - start < delay)
{
for (ColumnFamilyStore cfs : cfss)
{
NoSpamLogger.getLogger(logger, 30, TimeUnit.SECONDS).debug("waitForCessation waiting on: " + Arrays.toString(cfs.getTracker().getCompacting().toArray()));
}
if (CompactionManager.instance.isCompacting(cfss, sstablePredicate))
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
else
Expand Down
93 changes: 88 additions & 5 deletions src/java/org/apache/cassandra/db/compaction/ShardManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.cassandra.db.compaction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.BiFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SortedLocalRanges;
Expand All @@ -38,6 +43,8 @@

public interface ShardManager
{
static final Logger shardManagerLogger = LoggerFactory.getLogger(ShardManager.class);

/// Single-partition, and generally sstables with very few partitions, can cover very small sections of the token
/// space, resulting in very high densities.
///
Expand Down Expand Up @@ -259,21 +266,90 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShardsLimited(Co
int maxParallelism,
BiFunction<Collection<R>, Range<Token>, T> maker)
{
shardManagerLogger.debug("splitSSTablesInShardsLimited: numShardsForDensity {} coveredShards: {} maxParallelism: {} sstables {} operationRange {}", numShardsForDensity, coveredShards, maxParallelism, sstables, operationRange);
if (coveredShards <= maxParallelism)
{
shardManagerLogger.debug("coveredShards <= maxParallelism");
return splitSSTablesInShards(sstables, operationRange, numShardsForDensity, maker);
// We may be in a simple case where we can reduce the number of shards by some power of 2.
int multiple = Integer.highestOneBit(coveredShards / maxParallelism);
if (maxParallelism * multiple == coveredShards)
return splitSSTablesInShards(sstables, operationRange, numShardsForDensity / multiple, maker);
}
// // We may be in a simple case where we can reduce the number of shards by some power of 2.
// int multiple = Integer.highestOneBit(coveredShards / maxParallelism);
// if (maxParallelism * multiple == coveredShards)
// {
// shardManagerLogger.debug("maxParallelism * multiple == coveredShards");
// return splitSSTablesInShards(sstables, operationRange, numShardsForDensity / multiple, maker);
// }

var shards = splitSSTablesInShards(sstables,
operationRange,
numShardsForDensity,
(rangeSSTables, range) -> Pair.create(Set.copyOf(rangeSSTables), range));
shardManagerLogger.debug("applyMaxParallelism: {} {}", maxParallelism, shards);
return applyMaxParallelism(maxParallelism, maker, shards);
}

private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int maxParallelism, BiFunction<Collection<R>, Range<Token>, T> maker, List<Pair<Set<R>, Range<Token>>> shards)
private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int maxParallelism,
BiFunction<Collection<R>, Range<Token>, T> maker,
List<Pair<Set<R>, Range<Token>>> shards)
{
if (maxParallelism >= shards.size())
{
// We can fit within the parallelism limit without grouping, because some ranges are empty.
// This is not expected to happen often, but if it does, take advantage.
List<T> tasks = new ArrayList<>();
for (Pair<Set<R>, Range<Token>> pair : shards)
tasks.add(maker.apply(pair.left, pair.right));
return tasks;
}

double totalSpan = shards.stream().map(Pair::right).mapToDouble(r -> r.left.size(r.right)).sum();
double spanPerTask = totalSpan / maxParallelism;

List<T> tasks = new ArrayList<>();
Set<R> currentSSTables = new HashSet<>();
Token rangeStart = null;
double currentSpan = 0;
int shardsRemaining = shards.size();
int tasksRemaining = maxParallelism;

for (Pair<Set<R>, Range<Token>> pair : shards)
{
Token currentStart = pair.right.left;
Token currentEnd = pair.right.right;
double span = currentStart.size(currentEnd);

if (rangeStart == null)
rangeStart = currentStart;

currentSSTables.addAll(pair.left);
currentSpan += span;
shardsRemaining--;

boolean isLastTask = tasksRemaining == 1;
boolean shouldEmit = !isLastTask &&
(currentSpan >= spanPerTask || shardsRemaining + tasks.size() + 1 == maxParallelism);

if (shouldEmit)
{
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, currentEnd)));
currentSSTables = new HashSet<>();
rangeStart = null;
currentSpan = 0;
tasksRemaining--;
}
}

if (!currentSSTables.isEmpty())
{
Token finalEnd = shards.get(shards.size() - 1).right.right;
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, finalEnd)));
}

assert tasks.size() == maxParallelism : tasks.size() + " != " + maxParallelism;
return tasks;
}

private static <T, R extends CompactionSSTable> List<T> applyMaxParallelismOld(int maxParallelism, BiFunction<Collection<R>, Range<Token>, T> maker, List<Pair<Set<R>, Range<Token>>> shards)
{
int actualParallelism = shards.size();
if (maxParallelism >= actualParallelism)
Expand All @@ -289,25 +365,30 @@ private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int
// Otherwise we have to group shards together. Define a target token span per task and greedily group
// to be as close to it as possible.
double spanPerTask = shards.stream().map(Pair::right).mapToDouble(t -> t.left.size(t.right)).sum() / maxParallelism;
shardManagerLogger.debug("Applying max parallelism {}, span per task: {} to {} shards: {}", maxParallelism, spanPerTask, shards.size(), shards);
double currentSpan = 0;
Set<R> currentSSTables = new HashSet<>();
Token rangeStart = null;
Token prevEnd = null;
List<T> tasks = new ArrayList<>(maxParallelism);
for (var pair : shards)
{
shardManagerLogger.debug("Start of loop for Pair: {}", pair);
final Token currentEnd = pair.right.right;
final Token currentStart = pair.right.left;
double span = currentStart.size(currentEnd);
shardManagerLogger.debug("span: {}", span);
if (rangeStart == null)
rangeStart = currentStart;
if (currentSpan + span >= spanPerTask - 0.001) // rounding error safety
{
boolean includeCurrent = currentSpan + span - spanPerTask <= spanPerTask - currentSpan;
if (includeCurrent)
currentSSTables.addAll(pair.left);
shardManagerLogger.debug("Emit task for sstables: {}", currentSSTables);
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, includeCurrent ? currentEnd : prevEnd)));
currentSpan -= spanPerTask;
shardManagerLogger.debug("currentSpan: {}", currentSpan);
rangeStart = null;
currentSSTables.clear();
if (!includeCurrent)
Expand All @@ -321,6 +402,8 @@ private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int

currentSpan += span;
prevEnd = currentEnd;

shardManagerLogger.debug("End of loop currentSpan: {} sstables: {}", currentSpan, currentSSTables);
}
assert currentSSTables.isEmpty();
return tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
Expand All @@ -58,6 +60,7 @@
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
Expand Down Expand Up @@ -259,12 +262,13 @@ public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitO
permittedParallelism = Integer.MAX_VALUE;

List<AbstractCompactionTask> tasks = new ArrayList<>();
LifecycleTransaction txn = null;
try
{
// Split the space into independently compactable groups.
for (var aggregate : getMaximalAggregates())
{
LifecycleTransaction txn = realm.tryModify(aggregate.getSelected().sstables(),
txn = realm.tryModify(aggregate.getSelected().sstables(),
OperationType.COMPACTION,
aggregate.getSelected().id());

Expand All @@ -284,6 +288,8 @@ public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitO
}
catch (Throwable t)
{
if (txn != null)
txn.close();
throw rejectTasks(tasks, t);
}
}
Expand Down Expand Up @@ -414,9 +420,17 @@ public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate
selected.id());
if (transaction != null)
{
// This will ignore the range of the operation, which is fine.
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks);
try
{
// This will ignore the range of the operation, which is fine.
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks);
}
catch (Throwable e)
{
transaction.close();
throw e;
}
}
else
{
Expand Down Expand Up @@ -687,8 +701,8 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
sharedObserver)
);
compositeTransaction.completeInitialization();
assert tasks.size() <= parallelism;
assert tasks.size() <= coveredShardCount;
assert tasks.size() <= parallelism : "Task size: " + tasks.size() + " vs parallelism of: " + parallelism;
assert tasks.size() <= coveredShardCount : "Task size: " + tasks.size() + " vs covered shard count: " + coveredShardCount;

if (tasks.isEmpty())
transaction.close(); // this should not be reachable normally, close the transaction for safety
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.*;
import com.google.common.util.concurrent.Runnables;

import org.apache.commons.lang3.exception.ExceptionUtils;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
Expand Down Expand Up @@ -571,6 +573,9 @@ private void checkUnused()

private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate)
{
if (!unmark.isEmpty())
logger.debug("unmarkCompacting unmark: {} \n {}", unmark, ExceptionUtils.getStackTrace(new Exception()));

accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate);
// when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals
// with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.*;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,7 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
import org.apache.cassandra.utils.Throwables;

import static com.google.common.base.Predicates.equalTo;
import static com.google.common.base.Predicates.not;
Expand Down Expand Up @@ -280,6 +282,7 @@ public static Function<View, View> updateCompacting(final Set<? extends SSTableR
{
if (unmark.isEmpty() && Iterables.isEmpty(mark))
return Functions.identity();
logger.debug("updateCompacting mark: {} unmark: {} \n {}", mark, unmark, ExceptionUtils.getStackTrace(new Exception()));
return new Function<View, View>()
{
public View apply(View view)
Expand Down
Loading