Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ public enum CassandraRelevantProperties
*/
SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),

/**
* Whether to log detailed execution info when logging slow non-SAI queries.
* For SAI queries, see {@link #SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
*/
SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"),

/** The current version of the SAI on-disk index format. */
SAI_CURRENT_VERSION("cassandra.sai.latest.version", "ec"),

Expand Down Expand Up @@ -419,6 +425,7 @@ public enum CassandraRelevantProperties
/**
* Whether to log SAI-specific detailed execution info when logging slow SAI queries.
* This execution info includes the query metrics and the query plan of the slow queries.
* For non-SAI queries, see {@link #SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
*/
SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"),

Expand Down
43 changes: 38 additions & 5 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.monitoring.Monitorable;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
Expand Down Expand Up @@ -112,7 +114,7 @@ public abstract class ReadCommand extends AbstractReadQuery
@Nullable
protected final Index.QueryPlan indexQueryPlan;

private volatile Supplier<ExecutionInfo> executionInfoSupplier = ExecutionInfo.EMPTY_SUPPLIER;
private Supplier<ExecutionInfo> executionInfoSupplier = ExecutionInfo.EMPTY_SUPPLIER;

protected static abstract class SelectionDeserializer
{
Expand Down Expand Up @@ -445,8 +447,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
var storageTarget = (null == searcher) ? queryStorage(cfs, executionController)
: searchStorage(searcher, executionController);

if (searcher != null)
executionInfoSupplier = searcher.monitorableExecutionInfo();
// Prepare the monitorable execution info, which will be null if it's deferred to the index
ReadCommandExecutionInfo executionInfo = setupExecutionInfo(searcher);

UnfilteredPartitionIterator iterator = Transformation.apply(storageTarget, new TrackingRowIterator(context));
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
Expand All @@ -457,6 +459,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
iterator = withReadObserver(iterator);
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
if (executionInfo != null)
iterator = executionInfo.countFetched(iterator, nowInSec());

// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
// no point in checking it again.
Expand Down Expand Up @@ -489,8 +493,13 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
}

// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
return RTBoundCloser.close(iterator);
// because of the above, we need to append an artifical end bound if the source iterator was stopped short by a counter.
iterator = RTBoundCloser.close(iterator);

if (executionInfo != null)
iterator = executionInfo.countReturned(iterator, nowInSec());

return iterator;
}
catch (RuntimeException | Error e)
{
Expand Down Expand Up @@ -1112,4 +1121,28 @@ public long serializedSize(ReadCommand command, int version)
+ command.indexSerializedSize(version);
}
}

@Nullable
private ReadCommandExecutionInfo setupExecutionInfo(Index.Searcher searcher)
{
// if we have a searcher, it may use its own custom execution info instead of the generic one
if (searcher != null)
{
Supplier<Monitorable.ExecutionInfo> searcherExecutionInfoSupplier = searcher.monitorableExecutionInfo();
if (searcherExecutionInfoSupplier != null)
{
executionInfoSupplier = searcherExecutionInfoSupplier;
return null;
}
}

// if execution info is disabled, return null so we will keep using the default empty supplier
if (!CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean())
return null;

// otherwise, create and use the generic execution info
ReadCommandExecutionInfo commandExecutionInfo = new ReadCommandExecutionInfo();
executionInfoSupplier = () -> commandExecutionInfo;
return commandExecutionInfo;
}
}
146 changes: 146 additions & 0 deletions src/java/org/apache/cassandra/db/ReadCommandExecutionInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.db.monitoring.Monitorable;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.Index;

/**
* A custom {@link Monitorable.ExecutionInfo} implementation for {@link ReadCommand}, to be used unless there is an
* {@link Index.Searcher} with its own custom implementation.
* </p>
* It holds and prints the number of partitions, rows and tombstones fetched and returned by the command.
* </p>
* Deleted partitions are considered as a partition tombstone.
* Deleted rows and range tombstone markers are considered as row tombstones.
*/
@NotThreadSafe
class ReadCommandExecutionInfo implements Monitorable.ExecutionInfo
{
private long partitionsFetched = 0;
private long partitionsReturned = 0;
private long partitionTombstones = 0;
private long rowsFetched = 0;
private long rowsReturned = 0;
private long rowTombstones = 0;

/**
* Counts the number of fetched partitions and rows in the specified iterator.
*
* @param partitions the iterator of fetched partitions to count
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
* @return the same iterator
*/
UnfilteredPartitionIterator countFetched(UnfilteredPartitionIterator partitions, int nowInSec)
{
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
@Override
protected Row applyToRow(Row row)
{
if (row.hasLiveData(nowInSec, false))
rowsFetched++;
return row;
}
};
return Transformation.apply(partitions, new Transformation<>() {
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
if (!partition.partitionLevelDeletion().deletes(nowInSec))
partitionsFetched++;
return Transformation.apply(partition, rowCounter);
}
});
}

/**
* Counts the number of fetched partitions, rows and tombstones in the specified iterator.
*
* @param partitions the iterator of returned partitions to count
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
* @return the same iterator
*/
UnfilteredPartitionIterator countReturned(UnfilteredPartitionIterator partitions, int nowInSec)
{
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
@Override
protected Row applyToRow(Row row)
{
if (row.hasLiveData(nowInSec, false))
rowsReturned++;
else
rowTombstones++;
return row;
}

@Override
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
rowTombstones++;
return marker;
}
};
return Transformation.apply(partitions, new Transformation<>() {
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
if (partition.partitionLevelDeletion().deletes(nowInSec))
partitionTombstones++;
else
partitionsReturned++;
return Transformation.apply(partition, rowCounter);
}
});
}

@Override
public String toLogString(boolean unique)
{
StringBuilder sb = new StringBuilder("\n");
sb.append(INDENT);
sb.append(unique ? "Fetched/returned/tombstones:" : "Slowest fetched/returned/tombstones:");
append(sb, "partitions",
partitionsFetched,
partitionsReturned,
partitionTombstones);
append(sb, "rows",
rowsFetched,
rowsReturned,
rowTombstones);
return sb.toString();
}

private static void append(StringBuilder sb, String name, long fetched, long returned, long tombstones)
{
sb.append('\n')
.append(DOUBLE_INDENT)
.append(name)
.append(": ")
.append(fetched)
.append('/')
.append(returned)
.append('/')
.append(tombstones);
}
}
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/monitoring/Monitorable.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ default ExecutionInfo executionInfo()
*/
interface ExecutionInfo
{
String INDENT = " ";
String DOUBLE_INDENT = INDENT + INDENT;

/**
* An empty no-op implementation.
*/
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -754,15 +754,16 @@ interface Searcher
UnfilteredPartitionIterator search(ReadExecutionController executionController);

/**
* Returns a supplier for the {@link Monitorable.ExecutionInfo} for this query, to be used by
* Returns a supplier for the custom {@link Monitorable.ExecutionInfo} for this query, to be used by
* {@link ReadCommand#executionInfo()} at the end of the query to collect details about the query execution in
* case it is considered too slow.
*
* @return a supplier for the execution info for this query
* @return a supplier for the execution info for this query, or {@code null} if no custom execution info is available
*/
@Nullable
default Supplier<Monitorable.ExecutionInfo> monitorableExecutionInfo()
{
return Monitorable.ExecutionInfo.EMPTY_SUPPLIER;
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
*/
public class QueryMonitorableExecutionInfo implements Monitorable.ExecutionInfo
{
private static final String INDENT = " ";
private static final String DOUBLE_INDENT = INDENT + INDENT;

private final QueryContext.Snapshot metrics;
private final String plan;

Expand Down
Loading