diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index c56626c9863c..c2afe13abf10 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -384,19 +384,19 @@ public enum CassandraRelevantProperties CUSTOM_SSTABLE_WATCHER("cassandra.custom_sstable_watcher"), /** Defines the interval for reporting any operations that have timed out. */ - SLOW_QUERY_LOG_MONITORING_REPORT_INTERVAL_IN_MS("cassandra.monitoring_report_interval_ms", "5000"), + MONITORING_REPORT_INTERVAL_MS("cassandra.monitoring_report_interval_ms", "5000"), /** * Defines the maximum number of unique timed out queries that will be reported in the logs. * Use a negative number to remove any limit. */ - SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"), + MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"), /** * Whether to log detailed execution info when logging slow non-SAI queries. - * For SAI queries, see {@link #SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}. + * For SAI queries, see {@link #SAI_MONITORING_EXECUTION_INFO_ENABLED}. */ - SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"), + MONITORING_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"), /** The current version of the SAI on-disk index format. */ SAI_CURRENT_VERSION("cassandra.sai.latest.version", "ec"), @@ -425,9 +425,9 @@ public enum CassandraRelevantProperties /** * Whether to log SAI-specific detailed execution info when logging slow SAI queries. * This execution info includes the query metrics and the query plan of the slow queries. - * For non-SAI queries, see {@link #SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}. + * For non-SAI queries, see {@link #MONITORING_EXECUTION_INFO_ENABLED}. */ - SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"), + SAI_MONITORING_EXECUTION_INFO_ENABLED("cassandra.sai.monitoring_execution_info_enabled", "true"), /** Whether vector type only allows float vectors. True by default. **/ VECTOR_FLOAT_ONLY("cassandra.float_only_vectors", "true"), diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 1da1580699cc..db4e9cf7de72 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -1137,7 +1137,7 @@ private ReadCommandExecutionInfo setupExecutionInfo(Index.Searcher searcher) } // if execution info is disabled, return null so we will keep using the default empty supplier - if (!CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean()) + if (!CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.getBoolean()) return null; // otherwise, create and use the generic execution info diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java index 45c6d6447ce9..f21e1da7f4b5 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java @@ -55,8 +55,8 @@ public class MonitoringTask private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES); @VisibleForTesting - public static MonitoringTask instance = make(Math.max(0, CassandraRelevantProperties.SLOW_QUERY_LOG_MONITORING_REPORT_INTERVAL_IN_MS.getInt()), - CassandraRelevantProperties.SLOW_QUERY_LOG_MONITORING_MAX_OPERATIONS.getInt()); + public static MonitoringTask instance = make(Math.max(0, CassandraRelevantProperties.MONITORING_REPORT_INTERVAL_MS.getInt()), + CassandraRelevantProperties.MONITORING_MAX_OPERATIONS.getInt()); private final ScheduledFuture reportingTask; private final OperationsQueue failedOperationsQueue; @@ -318,6 +318,9 @@ protected abstract static class Operation * this is set lazily as it takes time to build the query CQL */ private String name; + /** Any specific execution info of the slowest operation among the aggregated operations. */ + protected Monitorable.ExecutionInfo slowestOperationExecutionInfo; + Operation(Monitorable operation, long nowNanos) { this.operation = operation; @@ -325,6 +328,7 @@ protected abstract static class Operation totalTimeNanos = nowNanos - operation.creationTimeNanos(); minTime = totalTimeNanos; maxTime = totalTimeNanos; + slowestOperationExecutionInfo = operation.executionInfo(); } public String name() @@ -334,17 +338,24 @@ public String name() return name; } - void add(Operation operation) + private void add(Operation operation) { numTimesReported++; totalTimeNanos += operation.totalTimeNanos; + + if (operation.maxTime > maxTime) + slowestOperationExecutionInfo = operation.executionInfo(); + maxTime = Math.max(maxTime, operation.maxTime); minTime = Math.min(minTime, operation.minTime); } public abstract String getLogMessage(); - protected abstract Monitorable.ExecutionInfo executionInfo(); + protected Monitorable.ExecutionInfo executionInfo() + { + return slowestOperationExecutionInfo; + } } /** @@ -360,26 +371,22 @@ private final static class FailedOperation extends Operation public String getLogMessage() { if (numTimesReported == 1) - return String.format("<%s>, total time %d msec, timeout %d %s", + return String.format("<%s>, total time %d msec, timeout %d %s%s", name(), NANOSECONDS.toMillis(totalTimeNanos), NANOSECONDS.toMillis(operation.timeoutNanos()), - operation.isCrossNode() ? "msec/cross-node" : "msec"); + operation.isCrossNode() ? "msec/cross-node" : "msec", + slowestOperationExecutionInfo.toLogString(true)); else - return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s", + return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s%s", name(), numTimesReported, NANOSECONDS.toMillis(totalTimeNanos / numTimesReported), NANOSECONDS.toMillis(minTime), NANOSECONDS.toMillis(maxTime), NANOSECONDS.toMillis(operation.timeoutNanos()), - operation.isCrossNode() ? "msec/cross-node" : "msec"); - } - - @Override - protected Monitorable.ExecutionInfo executionInfo() - { - return Monitorable.ExecutionInfo.EMPTY; + operation.isCrossNode() ? "msec/cross-node" : "msec", + slowestOperationExecutionInfo.toLogString(false)); } } @@ -389,14 +396,10 @@ protected Monitorable.ExecutionInfo executionInfo() @VisibleForTesting public final static class SlowOperation extends Operation { - /** Any specific execution info of the slowest operation among the aggregated operations. */ - private Monitorable.ExecutionInfo slowestOperationExecutionInfo; - @VisibleForTesting public SlowOperation(Monitorable operation, long slowAtNanos) { super(operation, slowAtNanos); - slowestOperationExecutionInfo = operation.executionInfo(); } public String getLogMessage() @@ -419,20 +422,5 @@ public String getLogMessage() operation.isCrossNode() ? "msec/cross-node" : "msec", slowestOperationExecutionInfo.toLogString(false)); } - - @Override - protected Monitorable.ExecutionInfo executionInfo() - { - return slowestOperationExecutionInfo; - } - - @Override - void add(Operation operation) - { - if (operation.maxTime > maxTime) - slowestOperationExecutionInfo = operation.executionInfo(); - - super.add(operation); - } } } diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java index 4a5ec11bfc7a..17bde737d730 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java @@ -52,7 +52,7 @@ private QueryMonitorableExecutionInfo(QueryContext.Snapshot metrics, String plan */ public static Supplier supplier(QueryContext context, Plan plan) { - if (!CassandraRelevantProperties.SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.getBoolean()) + if (!CassandraRelevantProperties.SAI_MONITORING_EXECUTION_INFO_ENABLED.getBoolean()) return Monitorable.ExecutionInfo.EMPTY_SUPPLIER; String planAsString = toLogString(plan); diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java b/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java new file mode 100644 index 000000000000..4db146ca87f4 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/AbortedQueryLoggerTest.java @@ -0,0 +1,414 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.monitoring.Monitorable; +import org.apache.cassandra.db.monitoring.MonitoringTask; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.sai.SlowSAIQueryLoggerTest; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Throwables; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.utils.MonotonicClock.approxTime; + +/** + * Tests the logging of aborted queries, as done by {@link MonitoringTask}, + * ensuring that the right {@link Monitorable.ExecutionInfo} is used. + *

+ * More detailed tests for the {@link Monitorable.ExecutionInfo} specifics can be found in the tests for slow but not + * aborted queries, {@link SlowQueryLoggerTest} and {@link SlowSAIQueryLoggerTest}. + */ +public class AbortedQueryLoggerTest extends TestBaseImpl +{ + private static final int QUERY_TIMEOUT_MS = 100; + private static final AtomicInteger SEQ = new AtomicInteger(); + + private static Cluster cluster; + private static String table; + private static ICoordinator coordinator; + private static IInvokableInstance node; + + @BeforeClass + public static void setupCluster() throws Exception + { + cluster = init(Cluster.build(2).withInstanceInitializer(AbortedQueryLoggerTest.BBHelper::install).start()); + + // Set a short read timeout in the coordinator. The replica will use the same timeout since it's sent as part of + // the message in Verb.expiration. + cluster.get(1).runOnInstance(() -> { + DatabaseDescriptor.setReadRpcTimeout(QUERY_TIMEOUT_MS); + DatabaseDescriptor.setRangeRpcTimeout(QUERY_TIMEOUT_MS); + }); + + coordinator = cluster.coordinator(1); + node = cluster.get(2); + } + + @AfterClass + public static void closeCluster() + { + if (cluster != null) + cluster.close(); + } + + @Before + public void before() + { + CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.setBoolean(true); + table = "t_" + SEQ.getAndIncrement(); + + // trigger the monitoring task to flush any pending slow operations before the test starts + node.runOnInstance(() -> MonitoringTask.instance.logOperations(approxTime.now())); + } + + @After + public void after() + { + cluster.schemaChange(format("DROP TABLE IF EXISTS %s.%s")); + } + + /** + * Test that log reports about aborted queries do not log sensitive data. + */ + @Test + public void testDoesNotLogSensitiveData() + { + setReadRowsBeforeDelay(0); + + cluster.schemaChange(format("CREATE TABLE %s.%s (k text, c text, v text, b blob, PRIMARY KEY (k, c))")); + coordinator.execute(format("INSERT INTO %s.%s (k, c, v) VALUES ('secret_k', 'secret_c', 'secret_v')"), ALL); + + // verify that slow queries are logged with redacted values + long mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s WHERE k = 'secret_k' AND c = 'secret_c' AND v = 'secret_v' ALLOW FILTERING"); + assertLogsContain(mark, node, "operations timed out", format("100B\\] ALLOW FILTERING>"), + format("10KiB\\] ALLOW FILTERING>")); + } + + /** + * Test that the slow query logger outputs the correct metrics for number of returned partitions, rows, etc. + */ + @Test + public void testLogsReadMetrics() + { + setReadRowsBeforeDelay(2); + + cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v int, l int, s int, PRIMARY KEY (k, c))")); + cluster.schemaChange(format("CREATE INDEX legacy_idx ON %s.%s (l)")); + cluster.schemaChange(format("CREATE CUSTOM INDEX sai_idx ON %s.%s (s) USING 'StorageAttachedIndex'")); + int numPartitions = 10; + int numClusterings = 10; + int numRows = 0; + for (int k = 0; k < numPartitions; k++) + for (int c = 0; c < numClusterings; c++) + coordinator.execute(format("INSERT INTO %s.%s (k, c, v, l, s) VALUES (?, ?, ?, ?, ?)"), + ALL, k, c, numRows++, numRows % 2, numRows % 2); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // unrestricted query + long mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s"); + assertLogsContain(mark, node, + format(""), + " Fetched/returned/tombstones:", + " partitions: 1/1/0", + " rows: 2/2/0"); + + // clustering query + setReadRowsBeforeDelay(0); + mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s WHERE k = 2 AND c = 2"); + assertLogsContain(mark, node, + format(""), + " Fetched/returned/tombstones:", + " partitions: 2/1/0", + " rows: 12/2/0"); + + // test multiple slow runs of different queries with the same redacted form, it should log the slowest one + mark = node.logs().mark(); + for (int i = 0; i < numPartitions; i++) + { + assertTimesOut("SELECT * FROM %s.%s WHERE k = " + i); + } + assertLogsContain(mark, node, + format(""), + " Fetched/returned/tombstones:", + " partitions: 1/1/1", + " rows: 2/2/0"); + + // delete a row and query again, to see row tombstone metrics + coordinator.execute(format("DELETE FROM %s.%s WHERE k = 1 AND c = 1"), ALL); + mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s"); + assertLogsContain(mark, node, + format(""), + " Fetched/returned/tombstones:", + " partitions: 1/1/1", + " rows: 2/2/2"); + + // query with a legacy index, which doesn't provide its own execution info, so generic execution info should be used + mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s WHERE l = 0"); + assertLogsContain(mark, node, + format(""), + " SAI slow query metrics:", + " sstablesHit: 3", + " segmentsHit: 3", + " keysFetched: 10", + " partitionsFetched: 1", + " partitionsReturned: 1", + " partitionTombstonesFetched: 1", + " rowsFetched: 3", + " rowsReturned: 3", + " rowTombstonesFetched: 4", + " trieSegmentsHit: 0", + " bkdPostingListsHit: 3", + " bkdSegmentsHit: 3", + " bkdPostingsSkips: 0", + " bkdPostingsDecodes: 0", + " triePostingsSkips: 0", + " triePostingsDecodes: 0", + " annGraphSearchLatencyNanos: 0", + " SAI slow query plan:"); + assertLogsDoNotContain(mark, node, " Fetched/returned/tombstones:"); + + // disable generic execution info logging and verify that info is not logged anymore + CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.setBoolean(false); + logOperations(); + mark = node.logs().mark(); + assertTimesOut("SELECT * FROM %s.%s"); + assertLogsContain(mark, node, format("")); + assertLogsDoNotContain(mark, node, " Fetched/returned/tombstones:", " SAI slow query metrics:"); + CassandraRelevantProperties.SAI_MONITORING_EXECUTION_INFO_ENABLED.setBoolean(true); + } + + private static String format(String query) + { + return String.format(query, KEYSPACE, table); + } + + private void setReadRowsBeforeDelay(int maxRows) + { + cluster.get(2).runOnInstance(() -> BBHelper.readRowsBeforeDelay.set(maxRows)); + } + + private void assertTimesOut(String query, Object... boundValues) + { + String formattedQuery = format(query); + Assertions.assertThatThrownBy(() -> coordinator.execute(formattedQuery, ALL, boundValues)) + .hasMessageContaining("Operation"); + } + + public static void assertLogsContain(long mark, IInvokableInstance node, String... lines) + { + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .pollDelay(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + logOperations(); + for (String line : lines) + { + List matchingLines = node.logs().grep(mark, line).getResult(); + Assertions.assertThat(matchingLines).isNotEmpty(); + } + }); + } + + public static void assertLogsDoNotContain(long mark, IInvokableInstance node, String... lines) + { + for (String line : lines) + { + List matchingLines = node.logs().grep(mark, line).getResult(); + Assertions.assertThat(matchingLines).isEmpty(); + } + } + + private static void logOperations() + { + node.runOnInstance(() -> MonitoringTask.instance.logOperations(approxTime.now())); + } + + /** + * ByteBuddy interceptor to delay reads on node 2 after having returned {@link BBHelper#readRowsBeforeDelay} rows. + */ + public static class BBHelper + { + private static final AtomicInteger readRowsBeforeDelay = new AtomicInteger(); + + @SuppressWarnings("resource") + public static void install(ClassLoader classLoader, Integer node) + { + if (node != 2) + return; + + new ByteBuddy().rebase(ReadCommand.class) + .method(named("executeLocally")) + .intercept(MethodDelegation.to(AbortedQueryLoggerTest.BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, + @SuperCall Callable zuperCall) + { + UnfilteredPartitionIterator result; + try + { + result = zuperCall.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + + if (!executionController.metadata().keyspace.equals(KEYSPACE)) + return result; + + if (readRowsBeforeDelay.get() == 0) + { + delay(); + return result; + } + + int nowInSec = FBUtilities.nowInSeconds(); + return Transformation.apply(result, new Transformation<>() + { + private int liveRows = 0; + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + return Transformation.apply(partition, new Transformation<>() + { + @Override + protected Row applyToRow(Row row) + { + if (row.hasLiveData(nowInSec, false) && ++liveRows >= readRowsBeforeDelay.get()) + delay(); + + return row; + } + }); + } + }); + } + + private static void delay() + { + Uninterruptibles.sleepUninterruptibly(QUERY_TIMEOUT_MS * 2, TimeUnit.MILLISECONDS); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/SlowQueryLoggerTest.java b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryLoggerTest.java index c001f9760a2c..ba0657073133 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SlowQueryLoggerTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryLoggerTest.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -63,7 +64,7 @@ public class SlowQueryLoggerTest extends TestBaseImpl public static void setupCluster() throws Exception { // effectively disable the scheduled monitoring task so we control it manually for better test stability - System.setProperty("cassandra.slow_query_log_interval_in_ms", "3600000"); + CassandraRelevantProperties.MONITORING_REPORT_INTERVAL_MS.setInt((int) TimeUnit.HOURS.toMillis(1)); cluster = init(Cluster.build(2) .withInstanceInitializer(SlowQueryLoggerTest.BBHelper::install) @@ -83,7 +84,7 @@ public static void closeCluster() @Before public void before() { - CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.setBoolean(true); + CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.setBoolean(true); table = "t_" + SEQ.getAndIncrement(); // trigger the monitoring task to flush any pending slow operations before the test starts @@ -247,7 +248,7 @@ public void testLogsReadMetrics() assertLogsDoNotContain(mark, node, " Fetched/returned/tombstones:"); // disable execution info logging and verify that info is not logged anymore - CassandraRelevantProperties.SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED.setBoolean(false); + CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.setBoolean(false); mark = node.logs().mark(); coordinator.execute(format("SELECT * FROM %s.%s"), ALL); assertLogsContain(mark, node, format("