Skip to content

Commit 1a79988

Browse files
committed
CNDB-16123: Add slow query logger execution info to non-SAI read commands
Enabled by default, can be disabled with property cassandra.monitoring_execution_info_enabled
1 parent 42ae0f3 commit 1a79988

File tree

8 files changed

+415
-76
lines changed

8 files changed

+415
-76
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ public enum CassandraRelevantProperties
392392
*/
393393
SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
394394

395+
/**
396+
* Whether to log detailed execution info when logging slow non-SAI queries.
397+
* For SAI queries, see {@link #SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
398+
*/
399+
SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"),
400+
395401
/** The current version of the SAI on-disk index format. */
396402
SAI_CURRENT_VERSION("cassandra.sai.latest.version", "ec"),
397403

@@ -419,6 +425,7 @@ public enum CassandraRelevantProperties
419425
/**
420426
* Whether to log SAI-specific detailed execution info when logging slow SAI queries.
421427
* This execution info includes the query metrics and the query plan of the slow queries.
428+
* For non-SAI queries, see {@link #SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
422429
*/
423430
SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"),
424431

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

41+
import org.apache.cassandra.config.CassandraRelevantProperties;
4142
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
4243
import org.apache.cassandra.db.filter.ColumnFilter;
4344
import org.apache.cassandra.db.filter.DataLimits;
4445
import org.apache.cassandra.db.filter.RowFilter;
4546
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
47+
import org.apache.cassandra.db.monitoring.Monitorable;
4648
import org.apache.cassandra.db.partitions.PartitionIterator;
4749
import org.apache.cassandra.db.partitions.PurgeFunction;
4850
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -445,8 +447,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
445447
var storageTarget = (null == searcher) ? queryStorage(cfs, executionController)
446448
: searchStorage(searcher, executionController);
447449

448-
if (searcher != null)
449-
executionInfoSupplier = searcher.monitorableExecutionInfo();
450+
// Prepare the monitorable execution info, which is deferred to the index
451+
ReadCommandExecutionInfo executionInfo = setupExecutionInfo(searcher);
450452

451453
UnfilteredPartitionIterator iterator = Transformation.apply(storageTarget, new TrackingRowIterator(context));
452454
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
@@ -457,6 +459,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
457459
iterator = withReadObserver(iterator);
458460
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
459461
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
462+
if (executionInfo != null)
463+
iterator = executionInfo.countFetched(iterator, nowInSec());
460464

461465
// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
462466
// no point in checking it again.
@@ -489,8 +493,13 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
489493
iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
490494
}
491495

492-
// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
493-
return RTBoundCloser.close(iterator);
496+
// because of the above, we need to append an artifical end bound if the source iterator was stopped short by a counter.
497+
iterator = RTBoundCloser.close(iterator);
498+
499+
if (executionInfo != null)
500+
iterator = executionInfo.countReturned(iterator, nowInSec());
501+
502+
return iterator;
494503
}
495504
catch (RuntimeException | Error e)
496505
{
@@ -1112,4 +1121,28 @@ public long serializedSize(ReadCommand command, int version)
11121121
+ command.indexSerializedSize(version);
11131122
}
11141123
}
1124+
1125+
@Nullable
1126+
private ReadCommandExecutionInfo setupExecutionInfo(Index.Searcher searcher)
1127+
{
1128+
// if we have a searcher, it may use its own custom execution info replacing the generic one
1129+
if (searcher != null)
1130+
{
1131+
Supplier<Monitorable.ExecutionInfo> infoSupplier = searcher.monitorableExecutionInfo();
1132+
if (infoSupplier != null)
1133+
{
1134+
executionInfoSupplier = infoSupplier;
1135+
return null;
1136+
}
1137+
}
1138+
1139+
// if execution info is disabled, return null and leave the default empty supplier
1140+
if (!CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean())
1141+
return null;
1142+
1143+
// otherwise, create and use the generic execution info
1144+
ReadCommandExecutionInfo executionInfo = new ReadCommandExecutionInfo();
1145+
executionInfoSupplier = () -> executionInfo;
1146+
return executionInfo;
1147+
}
11151148
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.cassandra.db;
18+
19+
import javax.annotation.concurrent.NotThreadSafe;
20+
21+
import org.apache.cassandra.db.monitoring.Monitorable;
22+
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
23+
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
24+
import org.apache.cassandra.db.rows.Row;
25+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
26+
import org.apache.cassandra.db.transform.Transformation;
27+
import org.apache.cassandra.index.Index;
28+
29+
/**
30+
* A custom {@link Monitorable.ExecutionInfo} implementation for {@link ReadCommand}, to be used unless there is an
31+
* {@link Index.Searcher} with its own custom implementation.
32+
* </p>
33+
* It holds and prints the number of partitions, rows and tombstones fetched and returned by the command.
34+
* </p>
35+
* Deleted partitions are considered as a partition tombstone.
36+
* Deleted rows and range tombstone markers are considered as row tombstones.
37+
* </p>
38+
* Instances of this class are meant to be updated by a single thread (the read command itself) and read by any number
39+
* of threads (the read command's monitoring task).
40+
*/
41+
@NotThreadSafe
42+
class ReadCommandExecutionInfo implements Monitorable.ExecutionInfo
43+
{
44+
private volatile long partitionsFetched = 0;
45+
private volatile long partitionsReturned = 0;
46+
private volatile long partitionTombstones = 0;
47+
private volatile long rowsFetched = 0;
48+
private volatile long rowsReturned = 0;
49+
private volatile long rowTombstones = 0;
50+
51+
/**
52+
* Counts the number of fetched partitions and rows in the specified iterator.
53+
*
54+
* @param partitions the iterator of fetched partitions to count
55+
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
56+
* @return the same iterator
57+
*/
58+
UnfilteredPartitionIterator countFetched(UnfilteredPartitionIterator partitions, int nowInSec)
59+
{
60+
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
61+
@Override
62+
protected Row applyToRow(Row row)
63+
{
64+
if (row.hasLiveData(nowInSec, false))
65+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
66+
rowsFetched++;
67+
return row;
68+
}
69+
};
70+
return Transformation.apply(partitions, new Transformation<>() {
71+
@Override
72+
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
73+
{
74+
if (!partition.partitionLevelDeletion().deletes(nowInSec))
75+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
76+
partitionsFetched++;
77+
return Transformation.apply(partition, rowCounter);
78+
}
79+
});
80+
}
81+
82+
/**
83+
* Counts the number of fetched partitions, rows and tombstones in the specified iterator.
84+
*
85+
* @param partitions the iterator of returned partitions to count
86+
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
87+
* @return the same iterator
88+
*/
89+
UnfilteredPartitionIterator countReturned(UnfilteredPartitionIterator partitions, int nowInSec)
90+
{
91+
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
92+
@Override
93+
protected Row applyToRow(Row row)
94+
{
95+
if (row.hasLiveData(nowInSec, false))
96+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
97+
rowsReturned++;
98+
else
99+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
100+
rowTombstones++;
101+
return row;
102+
}
103+
104+
@Override
105+
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
106+
{
107+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
108+
rowTombstones++;
109+
return marker;
110+
}
111+
};
112+
return Transformation.apply(partitions, new Transformation<>() {
113+
@Override
114+
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
115+
{
116+
if (partition.partitionLevelDeletion().deletes(nowInSec))
117+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
118+
partitionTombstones++;
119+
else
120+
// noinspection NonAtomicOperationOnVolatileField (single writer thread)
121+
partitionsReturned++;
122+
return Transformation.apply(partition, rowCounter);
123+
}
124+
});
125+
}
126+
127+
@Override
128+
public String toLogString(boolean unique)
129+
{
130+
StringBuilder sb = new StringBuilder("\n");
131+
sb.append(INDENT);
132+
sb.append(unique ? "Fetched/returned/tombstones:" : "Slowest fetched/returned/tombstones:");
133+
append(sb, "partitions",
134+
partitionsFetched,
135+
partitionsReturned,
136+
partitionTombstones);
137+
append(sb, "rows",
138+
rowsFetched,
139+
rowsReturned,
140+
rowTombstones);
141+
return sb.toString();
142+
}
143+
144+
private static void append(StringBuilder sb, String name, long fetched, long returned, long tombstones)
145+
{
146+
sb.append('\n')
147+
.append(DOUBLE_INDENT)
148+
.append(name)
149+
.append(": ")
150+
.append(fetched)
151+
.append('/')
152+
.append(returned)
153+
.append('/')
154+
.append(tombstones);
155+
}
156+
}

src/java/org/apache/cassandra/db/monitoring/Monitorable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ default ExecutionInfo executionInfo()
5454
*/
5555
interface ExecutionInfo
5656
{
57+
String INDENT = " ";
58+
String DOUBLE_INDENT = INDENT + INDENT;
59+
5760
/**
5861
* An empty no-op implementation.
5962
*/

src/java/org/apache/cassandra/index/Index.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -754,15 +754,16 @@ interface Searcher
754754
UnfilteredPartitionIterator search(ReadExecutionController executionController);
755755

756756
/**
757-
* Returns a supplier for the {@link Monitorable.ExecutionInfo} for this query, to be used by
757+
* Returns a supplier for the custom {@link Monitorable.ExecutionInfo} for this query, to be used by
758758
* {@link ReadCommand#executionInfo()} at the end of the query to collect details about the query execution in
759759
* case it is considered too slow.
760760
*
761-
* @return a supplier for the execution info for this query
761+
* @return a supplier for the execution info for this query, or {@code null} if no custom execution info is available
762762
*/
763+
@Nullable
763764
default Supplier<Monitorable.ExecutionInfo> monitorableExecutionInfo()
764765
{
765-
return Monitorable.ExecutionInfo.EMPTY_SUPPLIER;
766+
return null;
766767
}
767768
}
768769

src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
*/
2929
public class QueryMonitorableExecutionInfo implements Monitorable.ExecutionInfo
3030
{
31-
private static final String INDENT = " ";
32-
private static final String DOUBLE_INDENT = INDENT + INDENT;
33-
3431
private final QueryContext.Snapshot metrics;
3532
private final String plan;
3633

0 commit comments

Comments
 (0)