Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,13 @@ public enum CassandraRelevantProperties

SAI_QUERY_OPT_LEVEL("cassandra.sai.query.optimization.level", "1"),
SAI_REDUCE_TOPK_ACROSS_SSTABLES("cassandra.sai.reduce_topk_across_sstables", "true"),

/**
* Whether to log SAI-specific detailed execution info when logging slow SAI queries.
* This execution info includes the query metrics and the query plan of the slow queries.
*/
SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"),

SAI_TEST_DISABLE_TIMEOUT("cassandra.sai.test.timeout_disabled", "false"),
SAI_TEST_LAST_VALID_SEGMENTS("cassandra.sai.test_last_valid_segments", "-1"),
SAI_TEST_SEGMENT_BUILD_MEMORY_LIMIT("cassandra.test.sai.segment_build_memory_limit"),
Expand Down Expand Up @@ -848,6 +855,7 @@ public enum CassandraRelevantProperties
SKIP_PAXOS_STATE_REBUILD("cassandra.skip_paxos_state_rebuild"),
/** Whether to skip rewriting hints when original host id left the cluster */
SKIP_REWRITING_HINTS_ON_HOST_LEFT("cassandra.hinted_handoff.skip_rewriting_hints_on_host_left"),

/** snapshots ttl cleanup initial delay in seconds */
SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS("cassandra.snapshot.ttl_cleanup_initial_delay_seconds", "5"),
/** snapshots ttl cleanup period in seconds */
Expand Down
16 changes: 13 additions & 3 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.function.LongPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;

import java.util.function.Supplier;
import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -138,6 +138,8 @@ public abstract class ReadCommand extends AbstractReadQuery
@Nullable
protected final Index.QueryPlan indexQueryPlan;

private volatile Supplier<ExecutionInfo> executionInfoSupplier = ExecutionInfo.EMPTY_SUPPLIER;

protected static abstract class SelectionDeserializer
{
public abstract ReadCommand deserialize(DataInputPlus in,
Expand Down Expand Up @@ -323,6 +325,12 @@ public Index.Searcher indexSearcher()
return indexQueryPlan == null ? null : indexQueryPlan.searcherFor(this);
}

@Override
public ExecutionInfo executionInfo()
{
return executionInfoSupplier.get();
}

/**
* The clustering index filter this command to use for the provided key.
* <p>
Expand Down Expand Up @@ -488,9 +496,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
try
{
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
Index.QueryPlan indexQueryPlan = indexQueryPlan();

Index.Searcher searcher = null;

if (indexQueryPlan != null)
{
cfs.indexManager.checkQueryability(indexQueryPlan);
Expand All @@ -509,6 +516,9 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
Context context = Context.from(this);
var storageTarget = (null == searcher) ? queryStorage(cfs, executionController)
: searchStorage(searcher, executionController);
if (searcher != null)
executionInfoSupplier = searcher.monitorableExecutionInfo();

UnfilteredPartitionIterator iterator = Transformation.apply(storageTarget, new TrackingRowIterator(context));
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);

Expand Down
39 changes: 39 additions & 0 deletions src/java/org/apache/cassandra/db/monitoring/Monitorable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.cassandra.db.monitoring;

import java.util.function.Supplier;

public interface Monitorable
{
String name();
Expand All @@ -33,4 +35,41 @@ public interface Monitorable

boolean abort();
boolean complete();

/**
* Returns the specific {@link ExecutionInfo} for this monitorable operation.
*
* @return the execution info for this operation
*/
default ExecutionInfo executionInfo()
{
return ExecutionInfo.EMPTY;
}

/**
* Specific execution details for a monitorable operation.
* </p>
* {@link Monitorable} implementations should use this interface to hold and provide additional information about
* the execution of the operation. This information will be logged when the operation is reported as slow.
*/
interface ExecutionInfo
{
/**
* An empty no-op implementation.
*/
ExecutionInfo EMPTY = unique -> "";

/**
* A supplier for the empty implementation.
*/
Supplier<ExecutionInfo> EMPTY_SUPPLIER = () -> EMPTY;

/**
* Returns a string representation of this execution info, suitable for logging.
*
* @param unique whether the execution info is for a single operation or an aggregation of operations
* @return a log-suitable string representation of this execution info
*/
String toLogString(boolean unique);
}
}
51 changes: 41 additions & 10 deletions src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* We also log timed out operations, see CASSANDRA-7392.
* Since CASSANDRA-12403 we also log queries that were slow.
*/
@VisibleForTesting
public class MonitoringTask
{
private static final String LINE_SEPARATOR = CassandraRelevantProperties.LINE_SEPARATOR.getString();
Expand All @@ -72,7 +73,6 @@ public class MonitoringTask
private final OperationsQueue slowOperationsQueue;
private long approxLastLogTimeNanos;


@VisibleForTesting
static MonitoringTask make(int reportIntervalMillis, int maxTimedoutOperations)
{
Expand Down Expand Up @@ -328,11 +328,11 @@ protected abstract static class Operation
* this is set lazily as it takes time to build the query CQL */
private String name;

Operation(Monitorable operation, long failedAtNanos)
Operation(Monitorable operation, long nowNanos)
{
this.operation = operation;
numTimesReported = 1;
totalTimeNanos = failedAtNanos - operation.creationTimeNanos();
totalTimeNanos = nowNanos - operation.creationTimeNanos();
minTime = totalTimeNanos;
maxTime = totalTimeNanos;
}
Expand All @@ -353,6 +353,8 @@ void add(Operation operation)
}

public abstract String getLogMessage();

protected abstract Monitorable.ExecutionInfo executionInfo();
}

/**
Expand Down Expand Up @@ -383,35 +385,64 @@ public String getLogMessage()
NANOSECONDS.toMillis(operation.timeoutNanos()),
operation.isCrossNode() ? "msec/cross-node" : "msec");
}

@Override
protected Monitorable.ExecutionInfo executionInfo()
{
return Monitorable.ExecutionInfo.EMPTY;
}
}

/**
* An operation (query) that was reported as slow.
*/
private final static class SlowOperation extends Operation
@VisibleForTesting
public final static class SlowOperation extends Operation
{
SlowOperation(Monitorable operation, long failedAt)
/** Any specific execution info of the slowest operation among the aggregated operations. */
private Monitorable.ExecutionInfo slowestOperationExecutionInfo;

@VisibleForTesting
public SlowOperation(Monitorable operation, long slowAtNanos)
{
super(operation, failedAt);
super(operation, slowAtNanos);
slowestOperationExecutionInfo = operation.executionInfo();
}

public String getLogMessage()
{
if (numTimesReported == 1)
return String.format("<%s>, time %d msec - slow timeout %d %s",
return String.format("<%s>, time %d msec - slow timeout %d %s%s",
name(),
NANOSECONDS.toMillis(totalTimeNanos),
NANOSECONDS.toMillis(operation.slowTimeoutNanos()),
operation.isCrossNode() ? "msec/cross-node" : "msec");
operation.isCrossNode() ? "msec/cross-node" : "msec",
slowestOperationExecutionInfo.toLogString(true));
else
return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d msec - slow timeout %d %s",
return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d msec - slow timeout %d %s%s",
name(),
numTimesReported,
NANOSECONDS.toMillis(totalTimeNanos/ numTimesReported),
NANOSECONDS.toMillis(minTime),
NANOSECONDS.toMillis(maxTime),
NANOSECONDS.toMillis(operation.slowTimeoutNanos()),
operation.isCrossNode() ? "msec/cross-node" : "msec");
operation.isCrossNode() ? "msec/cross-node" : "msec",
slowestOperationExecutionInfo.toLogString(false));
}

@Override
protected Monitorable.ExecutionInfo executionInfo()
{
return slowestOperationExecutionInfo;
}

@Override
void add(Operation operation)
{
if (operation.maxTime > maxTime)
slowestOperationExecutionInfo = operation.executionInfo();

super.add(operation);
}
}
}
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.monitoring.Monitorable;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
Expand Down Expand Up @@ -801,6 +802,18 @@ interface Searcher
* @return partitions from the base table matching the criteria of the search.
*/
UnfilteredPartitionIterator search(ReadExecutionController executionController);

/**
* Returns a supplier for the {@link Monitorable.ExecutionInfo} for this query, to be used by
* {@link ReadCommand#executionInfo()} at the end of the query to collect details about the query execution in
* case it is considered too slow.
*
* @return a supplier for the execution info for this query
*/
default Supplier<Monitorable.ExecutionInfo> monitorableExecutionInfo()
{
return Monitorable.ExecutionInfo.EMPTY_SUPPLIER;
}
}

/**
Expand Down
62 changes: 62 additions & 0 deletions src/java/org/apache/cassandra/index/sai/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,66 @@ public enum FilterSortOrder
/** First get the candidates in ANN order from the vector index, then fetch the rows and filter them until we find K matching the predicates */
SCAN_THEN_FILTER
}

public Snapshot snapshot()
{
return new Snapshot(this);
}

/**
* A snapshot of all relevant metrics in a {@link QueryContext} at a specific point in time.
* This class memoizes the values of those metrics so that they can be reused by multiple metrics instances,
* without calculating the same values once and again.
* Also, this class should be more lightweight than the full {@link QueryContext}, in case of needing to retain it
* for long-ish periods of time, as in the case of the slow query logger, which tracks the metrics of the slowest
* queries over a fixed period of time.
*/
public static class Snapshot
{
public final long totalQueryTimeNs;
public final long sstablesHit;
public final long segmentsHit;
public final long partitionsRead;
public final long rowsFiltered;
public final long rowsPreFiltered;
public final long trieSegmentsHit;
public final long bkdPostingListsHit;
public final long bkdSegmentsHit;
public final long bkdPostingsSkips;
public final long bkdPostingsDecodes;
public final long triePostingsSkips;
public final long triePostingsDecodes;
public final long queryTimeouts;
public final long annGraphSearchLatency;
public final long shadowedPrimaryKeyCount;
public final long postFilteringReadLatency;
public final FilterSortOrder filterSortOrder;

/**
* Creates a snapshot of all the metrics in the given {@link QueryContext}.
*
* @param context the query context to snapshot
*/
private Snapshot(QueryContext context)
{
totalQueryTimeNs = context.totalQueryTimeNs();
sstablesHit = context.sstablesHit();
segmentsHit = context.segmentsHit();
partitionsRead = context.partitionsRead();
rowsFiltered = context.rowsFiltered();
rowsPreFiltered = context.rowsPreFiltered();
trieSegmentsHit = context.trieSegmentsHit();
bkdPostingListsHit = context.bkdPostingListsHit();
bkdSegmentsHit = context.bkdSegmentsHit();
bkdPostingsSkips = context.bkdPostingsSkips();
bkdPostingsDecodes = context.bkdPostingsDecodes();
triePostingsSkips = context.triePostingsSkips();
triePostingsDecodes = context.triePostingsDecodes();
queryTimeouts = context.queryTimeouts();
annGraphSearchLatency = context.annGraphSearchLatency();
shadowedPrimaryKeyCount = context.getShadowedPrimaryKeyCount();
postFilteringReadLatency = context.getPostFilteringReadLatency();
filterSortOrder = context.filterSortOrder();
}
}
}
Loading