Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-10907: Implement overlap diagnostics #1453

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

import org.slf4j.Logger;
Expand All @@ -48,6 +49,7 @@
import org.apache.cassandra.io.sstable.ScannerList;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Overlaps;

abstract class AbstractCompactionStrategy implements CompactionStrategy
{
Expand Down Expand Up @@ -397,4 +399,14 @@ public void periodicReport()
logger.statistics(this, "periodic", backgroundCompactions.getStatistics(this));
}
}

@Override
public Map<String, String> getMaxOverlapsMap()
{
final Set<? extends CompactionSSTable> liveSSTables = getSSTables();
return ImmutableMap.of("all", Integer.toString(Overlaps.maxOverlap(liveSSTables,
CompactionSSTable.startsAfter,
CompactionSSTable.firstKeyComparator,
CompactionSSTable.lastKeyComparator)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -201,4 +202,10 @@ default boolean supportsCursorCompaction()
}

void periodicReport();

/**
* Returns a map of sstable regions (e.g. repaired, unrepaired, possibly combined with level information) to the
* maximum overlap between the sstables in the region.
*/
Map<String, String> getMaxOverlapsMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1154,4 +1156,16 @@ public void onCompleted(UUID id, boolean isSuccess)
{

}

@Override
public Map<String, String> getMaxOverlapsMap()
{
Map<String, String> result = new LinkedHashMap<>();

for (AbstractStrategyHolder holder : holders)
for (LegacyAbstractCompactionStrategy strategy : holder.allStrategies())
result.putAll(strategy.getMaxOverlapsMap());

return result;
}
}
105 changes: 63 additions & 42 deletions src/java/org/apache/cassandra/db/compaction/ShardManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.ObjIntConsumer;

import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.PartitionPosition;
Expand Down Expand Up @@ -100,9 +102,7 @@ else if (partitioner.splitter().isPresent())

/// Construct a boundary/shard iterator for the given number of shards.
///
/// Note: This does not offer a method of listing the shard boundaries it generates, just to advance to the
/// corresponding one for a given token. The only usage for listing is currently in tests. Should a need for this
/// arise, see `CompactionSimulationTest` for a possible implementation.
/// If a list of the ranges for each shard is required instead, use [#getShardRanges].
ShardTracker boundaries(int shardCount);

static Range<Token> coveringRange(CompactionSSTable sstable)
Expand Down Expand Up @@ -166,16 +166,24 @@ default double density(long onDiskLength, PartitionPosition min, PartitionPositi
return onDiskLength / adjustSmallSpans(span, approximatePartitionCount);
}


/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard range and the intersecting sstable set.
default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collection<R> sstables,
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
/// the given function on the intersecting sstable set, with access to the shard tracker from which information
/// about the shard can be recovered.
///
/// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables
/// cover at least some portion of that range.
private <R extends CompactionSSTable> void assignSSTablesInShards(Collection<R> sstables,
Range<Token> operationRange,
int numShardsForDensity,
BiConsumer<Collection<R>, ShardTracker> consumer)
{
var boundaries = boundaries(numShardsForDensity);
List<T> tasks = new ArrayList<>();
SortingIterator<R> items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables);
PriorityQueue<R> active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator);
// Advance inside the range. This will add all sstables that start before the end of the covering shard.
if (operationRange != null)
boundaries.advanceTo(operationRange.left.nextValidToken());
while (items.hasNext() || !active.isEmpty())
{
if (active.isEmpty())
Expand All @@ -184,21 +192,46 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collectio
active.add(items.next());
}
Token shardEnd = boundaries.shardEnd();
if (operationRange != null &&
!operationRange.right.isMinimum() &&
shardEnd != null &&
shardEnd.compareTo(operationRange.right) >= 0)
shardEnd = null; // Take all remaining sstables.

while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0))
active.add(items.next());

final T result = maker.apply(active, boundaries.shardSpan());
if (result != null)
tasks.add(result);
consumer.accept(active, boundaries);

while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0))
active.poll();

if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all)
boundaries.advanceTo(shardEnd.nextValidToken());
}
return tasks;
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard index and the intersecting sstable set.
///
/// If an operationRange is given, this method restricts the collection to the given range and assumes all sstables
/// cover at least some portion of that range.
default <R extends CompactionSSTable> void assignSSTablesToShardIndexes(Collection<R> sstables,
Range<Token> operationRange,
int numShardsForDensity,
ObjIntConsumer<Collection<R>> consumer)
{
assignSSTablesInShards(sstables, operationRange, numShardsForDensity,
(rangeSSTables, boundaries) -> consumer.accept(rangeSSTables, boundaries.shardIndex()));
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
/// the given function on the combination of each shard range and the intersecting sstable set.
default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collection<R> sstables,
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
{
return splitSSTablesInShards(sstables, null, numShardsForDensity, maker);
}

/// Seggregate the given sstables into the shard ranges that intersect sstables from the collection, and call
Expand All @@ -211,39 +244,12 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShards(Collectio
int numShardsForDensity,
BiFunction<Collection<R>, Range<Token>, T> maker)
{
if (operationRange == null)
return splitSSTablesInShards(sstables, numShardsForDensity, maker);

var boundaries = boundaries(numShardsForDensity);
List<T> tasks = new ArrayList<>();
SortingIterator<R> items = SortingIterator.create(CompactionSSTable.firstKeyComparator, sstables);
PriorityQueue<R> active = new PriorityQueue<>(CompactionSSTable.lastKeyComparator);
// Advance inside the range. This will add all sstables that start before the end of the covering shard.
boundaries.advanceTo(operationRange.left.nextValidToken());
while (items.hasNext() || !active.isEmpty())
{
if (active.isEmpty())
{
boundaries.advanceTo(items.peek().getFirst().getToken());
active.add(items.next());
}
Token shardEnd = boundaries.shardEnd();
if (!operationRange.right.isMinimum() && shardEnd != null && shardEnd.compareTo(operationRange.right) >= 0)
shardEnd = null; // Take all remaining sstables.

while (items.hasNext() && (shardEnd == null || items.peek().getFirst().getToken().compareTo(shardEnd) <= 0))
active.add(items.next());

final T result = maker.apply(active, boundaries.shardSpan());
assignSSTablesInShards(sstables, operationRange, numShardsForDensity, (rangeSSTables, boundaries) -> {
final T result = maker.apply(rangeSSTables, boundaries.shardSpan());
if (result != null)
tasks.add(result);

while (!active.isEmpty() && (shardEnd == null || active.peek().getLast().getToken().compareTo(shardEnd) <= 0))
active.poll();

if (!active.isEmpty()) // shardEnd must be non-null (otherwise the line above exhausts all)
boundaries.advanceTo(shardEnd.nextValidToken());
}
});
return tasks;
}

Expand Down Expand Up @@ -336,4 +342,19 @@ default int coveredShardCount(PartitionPosition first, PartitionPosition last, i
int lastShard = boundaries.shardIndex();
return lastShard - firstShard + 1;
}

/// Get the list of shard ranges for the given shard count. Useful for diagnostics and debugging.
default List<Range<Token>> getShardRanges(int shardCount)
{
var boundaries = boundaries(shardCount);
var result = new ArrayList<Range<Token>>(shardCount);
while (true)
{
result.add(boundaries.shardSpan());
if (boundaries.shardEnd() == null)
break;
boundaries.advanceTo(boundaries.shardEnd().nextValidToken());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,31 @@ private Token getEndToken(double toPos)

public Token shardStart()
{
ensureInitialized();
return currentStart;
}

public Token shardEnd()
{
ensureInitialized();
return currentEnd;
}

public Range<Token> shardSpan()
{
ensureInitialized();
return new Range<>(currentStart, currentEnd != null ? currentEnd : currentStart.minValue());
}

private void ensureInitialized()
{
if (diskIndex < 0)
{
enterDisk(0);
setEndToken();
}
}

public double shardSpanSize()
{
return shardStep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -358,6 +359,12 @@ public void periodicReport()
strategy.periodicReport();
}

@Override
public Map<String, String> getMaxOverlapsMap()
{
return strategy.getMaxOverlapsMap();
}

BackgroundCompactions getBackgroundCompactions()
{
return strategy.backgroundCompactions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ private void maybeUpdateSelector()
}
}

// used by CNDB
/// Get the current shard manager. Used internally, in tests and by CNDB.
public ShardManager getShardManager()
{
maybeUpdateSelector();
Expand Down Expand Up @@ -1250,6 +1250,63 @@ public Map<Arena, List<Level>> getLevels(Collection<? extends CompactionSSTable>
return ret;
}

/**
* Creates a map of maximum overlap, organized as a map from arena:level to the maximum number of sstables that
* overlap in that level, as well as a list showing the per-shard maximum overlap.
*
* The number of shards to list is calculated based on the maximum density of the sstables in the realm.
*/
@Override
public Map<String, String> getMaxOverlapsMap()
{
final Set<? extends CompactionSSTable> liveSSTables = realm.getLiveSSTables();
Map<UnifiedCompactionStrategy.Arena, List<UnifiedCompactionStrategy.Level>> arenas =
getLevels(liveSSTables, (i1, i2) -> true); // take all sstables

ShardManager shardManager = getShardManager();
Map<String, String> map = new LinkedHashMap<>();

// max general overlap (max # of sstables per query)
map.put("all", getMaxOverlapsPerShardString(liveSSTables, shardManager));

for (var arena : arenas.entrySet())
{
final String arenaName = arena.getKey().name();
for (var level : arena.getValue())
map.put(arenaName + "-L" + level.getIndex(), getMaxOverlapsPerShardString(level.getSSTables(), shardManager));
}
return map;
}

private String getMaxOverlapsPerShardString(Collection<? extends CompactionSSTable> sstables, ShardManager shardManager)
{
// Find the sstable with the biggest density to define the shard count.
// This is better than using a level's max bound as that will show more shards than there actually are.
double maxDensity = 0;
for (CompactionSSTable liveSSTable : sstables)
maxDensity = Math.max(maxDensity, shardManager.density(liveSSTable));
int shardCount = controller.getNumShards(maxDensity);

int[] overlapsMap = getMaxOverlapsPerShard(sstables, shardManager, shardCount);
int max = 0;
for (int i : overlapsMap)
max = Math.max(max, i);
return max + " (per shard: " + Arrays.toString(overlapsMap) + ")";
}

public static int[] getMaxOverlapsPerShard(Collection<? extends CompactionSSTable> sstables, ShardManager shardManager, int shardCount)
{
int[] overlapsMap = new int[shardCount];
shardManager.assignSSTablesToShardIndexes(sstables, null, shardCount,
(shardSSTables, shard) ->
overlapsMap[shard] = Overlaps.maxOverlap(shardSSTables,
CompactionSSTable.startsAfter,
CompactionSSTable.firstKeyComparator,
CompactionSSTable.lastKeyComparator));
// Indexes that do not have sstables are left with 0 overlaps.
return overlapsMap;
}

private static int levelOf(CompactionPick pick)
{
return (int) pick.parent();
Expand Down
Loading