diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index c56626c9863c..c2afe13abf10 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -384,19 +384,19 @@ public enum CassandraRelevantProperties
CUSTOM_SSTABLE_WATCHER("cassandra.custom_sstable_watcher"),
/** Defines the interval for reporting any operations that have timed out. */
- SLOW_QUERY_LOG_MONITORING_REPORT_INTERVAL_IN_MS("cassandra.monitoring_report_interval_ms", "5000"),
+ MONITORING_REPORT_INTERVAL_MS("cassandra.monitoring_report_interval_ms", "5000"),
/**
* Defines the maximum number of unique timed out queries that will be reported in the logs.
* Use a negative number to remove any limit.
*/
- SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
+ MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
/**
* Whether to log detailed execution info when logging slow non-SAI queries.
- * For SAI queries, see {@link #SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
+ * For SAI queries, see {@link #SAI_MONITORING_EXECUTION_INFO_ENABLED}.
*/
- SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"),
+ MONITORING_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"),
/** The current version of the SAI on-disk index format. */
SAI_CURRENT_VERSION("cassandra.sai.latest.version", "ec"),
@@ -425,9 +425,9 @@ public enum CassandraRelevantProperties
/**
* Whether to log SAI-specific detailed execution info when logging slow SAI queries.
* This execution info includes the query metrics and the query plan of the slow queries.
- * For non-SAI queries, see {@link #SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
+ * For non-SAI queries, see {@link #MONITORING_EXECUTION_INFO_ENABLED}.
*/
- SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"),
+ SAI_MONITORING_EXECUTION_INFO_ENABLED("cassandra.sai.monitoring_execution_info_enabled", "true"),
/** Whether vector type only allows float vectors. True by default. **/
VECTOR_FLOAT_ONLY("cassandra.float_only_vectors", "true"),
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 1da1580699cc..db4e9cf7de72 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -1137,7 +1137,7 @@ private ReadCommandExecutionInfo setupExecutionInfo(Index.Searcher searcher)
}
// if execution info is disabled, return null so we will keep using the default empty supplier
- if (!CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean())
+ if (!CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.getBoolean())
return null;
// otherwise, create and use the generic execution info
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index 45c6d6447ce9..f21e1da7f4b5 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -55,8 +55,8 @@ public class MonitoringTask
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES);
@VisibleForTesting
- public static MonitoringTask instance = make(Math.max(0, CassandraRelevantProperties.SLOW_QUERY_LOG_MONITORING_REPORT_INTERVAL_IN_MS.getInt()),
- CassandraRelevantProperties.SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS.getInt());
+ public static MonitoringTask instance = make(Math.max(0, CassandraRelevantProperties.MONITORING_REPORT_INTERVAL_MS.getInt()),
+ CassandraRelevantProperties.MONITORING_MAX_OPERATIONS.getInt());
private final ScheduledFuture> reportingTask;
private final OperationsQueue failedOperationsQueue;
@@ -318,6 +318,9 @@ protected abstract static class Operation
* this is set lazily as it takes time to build the query CQL */
private String name;
+ /** Any specific execution info of the slowest operation among the aggregated operations. */
+ protected Monitorable.ExecutionInfo slowestOperationExecutionInfo;
+
Operation(Monitorable operation, long nowNanos)
{
this.operation = operation;
@@ -325,6 +328,7 @@ protected abstract static class Operation
totalTimeNanos = nowNanos - operation.creationTimeNanos();
minTime = totalTimeNanos;
maxTime = totalTimeNanos;
+ slowestOperationExecutionInfo = operation.executionInfo();
}
public String name()
@@ -334,17 +338,24 @@ public String name()
return name;
}
- void add(Operation operation)
+ private void add(Operation operation)
{
numTimesReported++;
totalTimeNanos += operation.totalTimeNanos;
+
+ if (operation.maxTime > maxTime)
+ slowestOperationExecutionInfo = operation.executionInfo();
+
maxTime = Math.max(maxTime, operation.maxTime);
minTime = Math.min(minTime, operation.minTime);
}
public abstract String getLogMessage();
- protected abstract Monitorable.ExecutionInfo executionInfo();
+ protected Monitorable.ExecutionInfo executionInfo()
+ {
+ return slowestOperationExecutionInfo;
+ }
}
/**
@@ -360,26 +371,22 @@ private final static class FailedOperation extends Operation
public String getLogMessage()
{
if (numTimesReported == 1)
- return String.format("<%s>, total time %d msec, timeout %d %s",
+ return String.format("<%s>, total time %d msec, timeout %d %s%s",
name(),
NANOSECONDS.toMillis(totalTimeNanos),
NANOSECONDS.toMillis(operation.timeoutNanos()),
- operation.isCrossNode() ? "msec/cross-node" : "msec");
+ operation.isCrossNode() ? "msec/cross-node" : "msec",
+ slowestOperationExecutionInfo.toLogString(true));
else
- return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s",
+ return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s%s",
name(),
numTimesReported,
NANOSECONDS.toMillis(totalTimeNanos / numTimesReported),
NANOSECONDS.toMillis(minTime),
NANOSECONDS.toMillis(maxTime),
NANOSECONDS.toMillis(operation.timeoutNanos()),
- operation.isCrossNode() ? "msec/cross-node" : "msec");
- }
-
- @Override
- protected Monitorable.ExecutionInfo executionInfo()
- {
- return Monitorable.ExecutionInfo.EMPTY;
+ operation.isCrossNode() ? "msec/cross-node" : "msec",
+ slowestOperationExecutionInfo.toLogString(false));
}
}
@@ -389,14 +396,10 @@ protected Monitorable.ExecutionInfo executionInfo()
@VisibleForTesting
public final static class SlowOperation extends Operation
{
- /** Any specific execution info of the slowest operation among the aggregated operations. */
- private Monitorable.ExecutionInfo slowestOperationExecutionInfo;
-
@VisibleForTesting
public SlowOperation(Monitorable operation, long slowAtNanos)
{
super(operation, slowAtNanos);
- slowestOperationExecutionInfo = operation.executionInfo();
}
public String getLogMessage()
@@ -419,20 +422,5 @@ public String getLogMessage()
operation.isCrossNode() ? "msec/cross-node" : "msec",
slowestOperationExecutionInfo.toLogString(false));
}
-
- @Override
- protected Monitorable.ExecutionInfo executionInfo()
- {
- return slowestOperationExecutionInfo;
- }
-
- @Override
- void add(Operation operation)
- {
- if (operation.maxTime > maxTime)
- slowestOperationExecutionInfo = operation.executionInfo();
-
- super.add(operation);
- }
}
}
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java
index 4a5ec11bfc7a..17bde737d730 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java
@@ -52,7 +52,7 @@ private QueryMonitorableExecutionInfo(QueryContext.Snapshot metrics, String plan
*/
public static Supplier supplier(QueryContext context, Plan plan)
{
- if (!CassandraRelevantProperties.SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean())
+ if (!CassandraRelevantProperties.SAI_MONITORING_EXECUTION_INFO_ENABLED.getBoolean())
return Monitorable.ExecutionInfo.EMPTY_SUPPLIER;
String planAsString = toLogString(plan);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java b/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java
new file mode 100644
index 000000000000..4db146ca87f4
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java
@@ -0,0 +1,414 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.monitoring.Monitorable;
+import org.apache.cassandra.db.monitoring.MonitoringTask;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.sai.SlowSAIQueryLoggerTest;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+
+/**
+ * Tests the logging of aborted queries, as done by {@link MonitoringTask},
+ * ensuring that the right {@link Monitorable.ExecutionInfo} is used.
+ *
+ * More detailed tests for the {@link Monitorable.ExecutionInfo} specifics can be found in the tests for slow but not
+ * aborted queries, {@link SlowQueryLoggerTest} and {@link SlowSAIQueryLoggerTest}.
+ */
+public class AbortedQueryLoggerTest extends TestBaseImpl
+{
+ private static final int QUERY_TIMEOUT_MS = 100;
+ private static final AtomicInteger SEQ = new AtomicInteger();
+
+ private static Cluster cluster;
+ private static String table;
+ private static ICoordinator coordinator;
+ private static IInvokableInstance node;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception
+ {
+ cluster = init(Cluster.build(2).withInstanceInitializer(AbortedQueryLoggerTest.BBHelper::install).start());
+
+ // Set a short read timeout in the coordinator. The replica will use the same timeout since it's sent as part of
+ // the message in Verb.expiration.
+ cluster.get(1).runOnInstance(() -> {
+ DatabaseDescriptor.setReadRpcTimeout(QUERY_TIMEOUT_MS);
+ DatabaseDescriptor.setRangeRpcTimeout(QUERY_TIMEOUT_MS);
+ });
+
+ coordinator = cluster.coordinator(1);
+ node = cluster.get(2);
+ }
+
+ @AfterClass
+ public static void closeCluster()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Before
+ public void before()
+ {
+ CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.setBoolean(true);
+ table = "t_" + SEQ.getAndIncrement();
+
+ // trigger the monitoring task to flush any pending slow operations before the test starts
+ node.runOnInstance(() -> MonitoringTask.instance.logOperations(approxTime.now()));
+ }
+
+ @After
+ public void after()
+ {
+ cluster.schemaChange(format("DROP TABLE IF EXISTS %s.%s"));
+ }
+
+ /**
+ * Test that log reports about aborted queries do not log sensitive data.
+ */
+ @Test
+ public void testDoesNotLogSensitiveData()
+ {
+ setReadRowsBeforeDelay(0);
+
+ cluster.schemaChange(format("CREATE TABLE %s.%s (k text, c text, v text, b blob, PRIMARY KEY (k, c))"));
+ coordinator.execute(format("INSERT INTO %s.%s (k, c, v) VALUES ('secret_k', 'secret_c', 'secret_v')"), ALL);
+
+ // verify that slow queries are logged with redacted values
+ long mark = node.logs().mark();
+ assertTimesOut("SELECT * FROM %s.%s WHERE k = 'secret_k' AND c = 'secret_c' AND v = 'secret_v' ALLOW FILTERING");
+ assertLogsContain(mark, node, "operations timed out", format("