diff --git a/src/java/org/apache/cassandra/index/sai/QueryContext.java b/src/java/org/apache/cassandra/index/sai/QueryContext.java index b008e3dcd2db..ec539b99fa1d 100644 --- a/src/java/org/apache/cassandra/index/sai/QueryContext.java +++ b/src/java/org/apache/cassandra/index/sai/QueryContext.java @@ -45,9 +45,31 @@ public class QueryContext private final LongAdder sstablesHit = new LongAdder(); private final LongAdder segmentsHit = new LongAdder(); - private final LongAdder partitionsRead = new LongAdder(); - private final LongAdder rowsPreFiltered = new LongAdder(); - private final LongAdder rowsFiltered = new LongAdder(); + + /** + * The partition/row keys that will be used to fetch rows from the base table. + * They will be either partition keys in AA, or row keys in the later row-aware disk formats. + */ + private final LongAdder keysFetched = new LongAdder(); + + /** The number of live partitions fetched from the storage engine, before post-filtering. */ + private final LongAdder partitionsFetched = new LongAdder(); + + /** The number of live partitions returned to the coordinator, after post-filtering. */ + private final LongAdder partitionsReturned = new LongAdder(); + + /** The number of deleted partitions that are fetched. */ + private final LongAdder partitionTombstonesFetched = new LongAdder(); + + /** The number of live rows fetched from the storage engine, before post-filtering. */ + private final LongAdder rowsFetched = new LongAdder(); + + /** The number of live rows returned to the coordinator, after post-filtering. */ + private final LongAdder rowsReturned = new LongAdder(); + + /** The number of deleted individual rows or ranges of rows that are fetched. */ + private final LongAdder rowTombstonesFetched = new LongAdder(); + private final LongAdder trieSegmentsHit = new LongAdder(); private final LongAdder bkdPostingListsHit = new LongAdder(); @@ -65,7 +87,6 @@ public class QueryContext private float annRerankFloor = 0.0f; // only called from single-threaded setup code - private final LongAdder shadowedPrimaryKeyCount = new LongAdder(); private final LongAdder postFilteringReadLatency = new LongAdder(); // Determines the order of using indexes for filtering and sorting. @@ -94,49 +115,81 @@ public void addSstablesHit(long val) { sstablesHit.add(val); } + public void addSegmentsHit(long val) { segmentsHit.add(val); } - public void addPartitionsRead(long val) + + public void addKeysFetched(long val) + { + keysFetched.add(val); + } + + public void addPartitionsFetched(long val) + { + partitionsFetched.add(val); + } + + public void addPartitionsReturned(long val) { - partitionsRead.add(val); + partitionsReturned.add(val); } - public void addRowsFiltered(long val) + + public void addPartitionTombstonesFetched(long val) + { + partitionTombstonesFetched.add(val); + } + + public void addRowsFetched(long val) { - rowsFiltered.add(val); + rowsFetched.add(val); } - public void addRowsPreFiltered(long val) + + public void addRowsReturned(long val) + { + rowsReturned.add(val); + } + + public void addRowTombstonesFetched(long val) { - rowsPreFiltered.add(val); + rowTombstonesFetched.add(val); } + public void addTrieSegmentsHit(long val) { trieSegmentsHit.add(val); } + public void addBkdPostingListsHit(long val) { bkdPostingListsHit.add(val); } + public void addBkdSegmentsHit(long val) { bkdSegmentsHit.add(val); } + public void addBkdPostingsSkips(long val) { bkdPostingsSkips.add(val); } + public void addBkdPostingsDecodes(long val) { bkdPostingsDecodes.add(val); } + public void addTriePostingsSkips(long val) { triePostingsSkips.add(val); } + public void addTriePostingsDecodes(long val) { triePostingsDecodes.add(val); } + public void addQueryTimeouts(long val) { queryTimeouts.add(val); @@ -163,53 +216,86 @@ public long sstablesHit() { return sstablesHit.longValue(); } + public long segmentsHit() { return segmentsHit.longValue(); } - public long partitionsRead() + + public long keysFetched() { - return partitionsRead.longValue(); + return keysFetched.longValue(); } - public long rowsFiltered() + + public long partitionsFetched() { - return rowsFiltered.longValue(); + return partitionsFetched.longValue(); } - public long rowsPreFiltered() + + public long partitionsReturned() { - return rowsPreFiltered.longValue(); + return partitionsReturned.longValue(); } + + public long partitionTombstonesFetched() + { + return partitionTombstonesFetched.longValue(); + } + + public long rowsFetched() + { + return rowsFetched.longValue(); + } + + public long rowsReturned() + { + return rowsReturned.longValue(); + } + + public long rowTombstonesFetched() + { + return rowTombstonesFetched.longValue(); + } + public long trieSegmentsHit() { return trieSegmentsHit.longValue(); } + public long bkdPostingListsHit() { return bkdPostingListsHit.longValue(); } + public long bkdSegmentsHit() { return bkdSegmentsHit.longValue(); } + public long bkdPostingsSkips() { return bkdPostingsSkips.longValue(); } + public long bkdPostingsDecodes() { return bkdPostingsDecodes.longValue(); } + public long triePostingsSkips() { return triePostingsSkips.longValue(); } + public long triePostingsDecodes() { return triePostingsDecodes.longValue(); } + public long queryTimeouts() { return queryTimeouts.longValue(); } + public long annGraphSearchLatency() { return annGraphSearchLatency.longValue(); @@ -229,19 +315,6 @@ public void checkpoint() } } - public void addShadowed(long count) - { - shadowedPrimaryKeyCount.add(count); - } - - /** - * @return shadowed primary keys, in ascending order - */ - public long getShadowedPrimaryKeyCount() - { - return shadowedPrimaryKeyCount.longValue(); - } - public float getAnnRerankFloor() { return annRerankFloor; @@ -289,9 +362,13 @@ public static class Snapshot public final long totalQueryTimeNs; public final long sstablesHit; public final long segmentsHit; - public final long partitionsRead; - public final long rowsFiltered; - public final long rowsPreFiltered; + public final long keysFetched; + public final long partitionsFetched; + public final long partitionsReturned; + public final long partitionTombstonesFetched; + public final long rowsFetched; + public final long rowsReturned; + public final long rowTombstonesFetched; public final long trieSegmentsHit; public final long bkdPostingListsHit; public final long bkdSegmentsHit; @@ -301,7 +378,6 @@ public static class Snapshot public final long triePostingsDecodes; public final long queryTimeouts; public final long annGraphSearchLatency; - public final long shadowedPrimaryKeyCount; public final long postFilteringReadLatency; public final FilterSortOrder filterSortOrder; @@ -315,9 +391,13 @@ private Snapshot(QueryContext context) totalQueryTimeNs = context.totalQueryTimeNs(); sstablesHit = context.sstablesHit(); segmentsHit = context.segmentsHit(); - partitionsRead = context.partitionsRead(); - rowsFiltered = context.rowsFiltered(); - rowsPreFiltered = context.rowsPreFiltered(); + keysFetched = context.keysFetched(); + partitionsFetched = context.partitionsFetched(); + partitionsReturned = context.partitionsReturned(); + partitionTombstonesFetched = context.partitionTombstonesFetched(); + rowsFetched = context.rowsFetched(); + rowsReturned = context.rowsReturned(); + rowTombstonesFetched = context.rowTombstonesFetched(); trieSegmentsHit = context.trieSegmentsHit(); bkdPostingListsHit = context.bkdPostingListsHit(); bkdSegmentsHit = context.bkdSegmentsHit(); @@ -327,7 +407,6 @@ private Snapshot(QueryContext context) triePostingsDecodes = context.triePostingsDecodes(); queryTimeouts = context.queryTimeouts(); annGraphSearchLatency = context.annGraphSearchLatency(); - shadowedPrimaryKeyCount = context.getShadowedPrimaryKeyCount(); postFilteringReadLatency = context.getPostFilteringReadLatency(); filterSortOrder = context.filterSortOrder(); } diff --git a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java index 8efe1412f980..44b0fd4b1f87 100644 --- a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java +++ b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java @@ -122,9 +122,9 @@ public void record(QueryContext context, ReadCommand command) "post-filtered {} in {}, and took {} microseconds.", pluralize(snapshot.sstablesHit, "SSTable index", "es"), pluralize(snapshot.segmentsHit, "segment", "s"), - pluralize(snapshot.rowsPreFiltered, "row", "s"), - pluralize(snapshot.rowsFiltered, "row", "s"), - pluralize(snapshot.partitionsRead, "partition", "s"), + pluralize(snapshot.rowsFetched, "row", "s"), + pluralize(snapshot.rowsReturned, "row", "s"), + pluralize(snapshot.partitionsReturned, "partition", "s"), queryLatencyMicros); } else @@ -133,8 +133,8 @@ public void record(QueryContext context, ReadCommand command) "and took {} microseconds.", pluralize(snapshot.sstablesHit, "SSTable index", "es"), pluralize(snapshot.segmentsHit, "segment", "s"), - pluralize(snapshot.rowsFiltered, "row", "s"), - pluralize(snapshot.partitionsRead, "partition", "s"), + pluralize(snapshot.rowsReturned, "row", "s"), + pluralize(snapshot.partitionsReturned, "partition", "s"), queryLatencyMicros); } } @@ -191,8 +191,13 @@ public static class PerTable extends AbstractQueryMetrics public static final String METRIC_TYPE = "TableQueryMetrics"; public final Counter totalQueryTimeouts; - public final Counter totalPartitionReads; - public final Counter totalRowsFiltered; + public final Counter totalKeysFetched; + public final Counter totalPartitionsFetched; + public final Counter totalPartitionsReturned; + public final Counter totalPartitionTombstonesFetched; + public final Counter totalRowsFetched; + public final Counter totalRowsReturned; + public final Counter totalRowTombstonesFetched; public final Counter totalQueriesCompleted; public final Counter sortThenFilterQueriesCompleted; @@ -207,8 +212,13 @@ public PerTable(TableMetadata table, QueryKind queryKind, Predicate { super(table.keyspace, table.name, METRIC_TYPE, queryKind, filter); - totalPartitionReads = Metrics.counter(createMetricName("TotalPartitionReads")); - totalRowsFiltered = Metrics.counter(createMetricName("TotalRowsFiltered")); + totalKeysFetched = Metrics.counter(createMetricName("TotalKeysFetched")); + totalPartitionsFetched = Metrics.counter(createMetricName("TotalPartitionsFetched")); + totalPartitionsReturned = Metrics.counter(createMetricName("TotalPartitionsReturned")); + totalPartitionTombstonesFetched = Metrics.counter(createMetricName("TotalPartitionTombstonesFetched")); + totalRowsFetched = Metrics.counter(createMetricName("TotalRowsFetched")); + totalRowsReturned = Metrics.counter(createMetricName("TotalRowsReturned")); + totalRowTombstonesFetched = Metrics.counter(createMetricName("TotalRowTombstonesFetched")); totalQueriesCompleted = Metrics.counter(createMetricName("TotalQueriesCompleted")); totalQueryTimeouts = Metrics.counter(createMetricName("TotalQueryTimeouts")); @@ -226,8 +236,13 @@ public void record(QueryContext.Snapshot snapshot) } totalQueriesCompleted.inc(); - totalPartitionReads.inc(snapshot.partitionsRead); - totalRowsFiltered.inc(snapshot.rowsFiltered); + totalKeysFetched.inc(snapshot.keysFetched); + totalPartitionsFetched.inc(snapshot.partitionsFetched); + totalPartitionsReturned.inc(snapshot.partitionsReturned); + totalPartitionTombstonesFetched.inc(snapshot.partitionTombstonesFetched); + totalRowsFetched.inc(snapshot.rowsFetched); + totalRowsReturned.inc(snapshot.rowsReturned); + totalRowTombstonesFetched.inc(snapshot.rowTombstonesFetched); if (snapshot.filterSortOrder == QueryContext.FilterSortOrder.SCAN_THEN_FILTER) sortThenFilterQueriesCompleted.inc(); @@ -250,8 +265,13 @@ public static class PerQuery extends AbstractQueryMetrics */ public final Histogram sstablesHit; public final Histogram segmentsHit; - public final Histogram partitionReads; - public final Histogram rowsFiltered; + public final Histogram keysFetched; + public final Histogram partitionsFetched; + public final Histogram partitionsReturned; + public final Histogram partitionTombstonesFetched; + public final Histogram rowsFetched; + public final Histogram rowsReturned; + public final Histogram rowTombstonesFetched; /** * BKD index metrics. @@ -263,9 +283,6 @@ public static class PerQuery extends AbstractQueryMetrics public final Histogram kdTreePostingsSkips; public final Histogram kdTreePostingsDecodes; - /** Shadowed keys scan metrics **/ - public final Histogram shadowedKeysScannedHistogram; - /** * Trie index posting lists metrics. */ @@ -292,6 +309,13 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate sstablesHit = Metrics.histogram(createMetricName("SSTableIndexesHit"), false); segmentsHit = Metrics.histogram(createMetricName("IndexSegmentsHit"), false); + keysFetched = Metrics.histogram(createMetricName("KeysFetched"), false); + partitionsFetched = Metrics.histogram(createMetricName("PartitionsFetched"), false); + partitionsReturned = Metrics.histogram(createMetricName("PartitionsReturned"), false); + partitionTombstonesFetched = Metrics.histogram(createMetricName("PartitionTombstonesFetched"), false); + rowsFetched = Metrics.histogram(createMetricName("RowsFetched"), false); + rowsReturned = Metrics.histogram(createMetricName("RowsReturned"), false); + rowTombstonesFetched = Metrics.histogram(createMetricName("RowTombstonesFetched"), false); kdTreePostingsSkips = Metrics.histogram(createMetricName("KDTreePostingsSkips"), true); kdTreePostingsNumPostings = Metrics.histogram(createMetricName("KDTreePostingsNumPostings"), false); @@ -300,11 +324,6 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate postingsSkips = Metrics.histogram(createMetricName("PostingsSkips"), true); postingsDecodes = Metrics.histogram(createMetricName("PostingsDecodes"), false); - partitionReads = Metrics.histogram(createMetricName("PartitionReads"), false); - rowsFiltered = Metrics.histogram(createMetricName("RowsFiltered"), false); - - shadowedKeysScannedHistogram = Metrics.histogram(createMetricName("ShadowedKeysScannedHistogram"), false); - // Key vector metrics that translate to performance annGraphSearchLatency = Metrics.timer(createMetricName("ANNGraphSearchLatency")); postFilteringReadLatency = Metrics.timer(createMetricName("PostFilteringReadLatency")); @@ -316,8 +335,13 @@ public void record(QueryContext.Snapshot snapshot) queryLatency.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS); sstablesHit.update(snapshot.sstablesHit); segmentsHit.update(snapshot.segmentsHit); - partitionReads.update(snapshot.partitionsRead); - rowsFiltered.update(snapshot.rowsFiltered); + keysFetched.update(snapshot.keysFetched); + partitionsFetched.update(snapshot.partitionsFetched); + partitionsReturned.update(snapshot.partitionsReturned); + partitionTombstonesFetched.update(snapshot.partitionTombstonesFetched); + rowsFetched.update(snapshot.rowsFetched); + rowsReturned.update(snapshot.rowsReturned); + rowTombstonesFetched.update(snapshot.rowTombstonesFetched); // Record string index cache metrics. if (snapshot.trieSegmentsHit > 0) @@ -342,8 +366,6 @@ public void record(QueryContext.Snapshot snapshot) { annGraphSearchLatency.update(snapshot.annGraphSearchLatency, TimeUnit.NANOSECONDS); } - - shadowedKeysScannedHistogram.update(snapshot.shadowedPrimaryKeyCount); postFilteringReadLatency.update(snapshot.postFilteringReadLatency, TimeUnit.NANOSECONDS); } } diff --git a/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java b/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java new file mode 100644 index 000000000000..443462d5a077 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java @@ -0,0 +1,81 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.sai.QueryContext; + +/** + * Counts the number of partitions, rows and tombstones fetched by an index query, before post-filtering and sorting. + */ +class CountFetchedTransformation extends Transformation +{ + private final QueryContext queryContext; + private final long nowInSec; + + CountFetchedTransformation(QueryContext queryContext, long nowInSec) + { + this.queryContext = queryContext; + this.nowInSec = nowInSec; + } + + /** + * Updates the query context metrics about the number of fetched partitions, rows and tombstones + * with the contents of the provided partition iterator. + * + * @param partition the results of querying the base table with the indexed keys, before applying post-filtering and sorting + * @return a copy of the provided row iterator, which will populate the query context as it is consumed + */ + UnfilteredRowIterator apply(UnfilteredRowIterator partition) + { + return Transformation.apply(partition, this); + } + + @Override + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + queryContext.checkpoint(); + if (deletionTime.deletes(nowInSec)) + queryContext.addPartitionTombstonesFetched(1); + else + queryContext.addPartitionsFetched(1); + return deletionTime; + } + + @Override + protected Row applyToRow(Row row) + { + queryContext.checkpoint(); + if (row.hasLiveData(nowInSec, false)) + queryContext.addRowsFetched(1); + else + queryContext.addRowTombstonesFetched(1); + return row; + } + + @Override + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + queryContext.checkpoint(); + queryContext.addRowTombstonesFetched(1); + return marker; + } +} diff --git a/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java b/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java new file mode 100644 index 000000000000..d442e93a4b84 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java @@ -0,0 +1,77 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.sai.QueryContext; + +/** + * Counts the final number of partitions and rows returned by a query to the coordinator, after post-filtering and sorting. + * Tombstones are not counted because they are not returned to the coordinator. + */ +class CountReturnedTransformation extends Transformation +{ + private final QueryContext queryContext; + private final Runnable onClose; + private final Transformation rowCounter; + + private CountReturnedTransformation(QueryContext queryContext, Runnable onClose) + { + this.queryContext = queryContext; + this.onClose = onClose; + rowCounter = new Transformation<>() { + @Override + protected Row applyToRow(Row row) + { + queryContext.checkpoint(); + queryContext.addRowsReturned(1); + return row; + } + }; + } + + /** + * Updates the query context metrics about the number of partitions and rows returned to the coordinator + * with the contents of the provided partition iterator. + * + * @param partition the partition iterator containing the final results to return to the coordinator + * @param queryContext the query context to update with the metrics + * @param onClose a callback to run when the transformation is closed + * @return a copy of the provided partition iterator, which will populate the query context as it is consumed + */ + static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator partition, QueryContext queryContext, Runnable onClose) + { + return Transformation.apply(partition, new CountReturnedTransformation(queryContext, onClose)); + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + queryContext.checkpoint(); + queryContext.addPartitionsReturned(1); + return Transformation.apply(partition, rowCounter); + } + + @Override + protected void onClose() + { + onClose.run(); + } +} diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java index b072825286ee..3a7d87113d46 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java @@ -72,9 +72,13 @@ public String toLogString(boolean unique) sb.append(sectionNamePrefix).append("metrics:\n"); appendMetric(sb, "sstablesHit", metrics.sstablesHit); appendMetric(sb, "segmentsHit", metrics.segmentsHit); - appendMetric(sb, "partitionsRead", metrics.partitionsRead); - appendMetric(sb, "rowsFiltered", metrics.rowsFiltered); - appendMetric(sb, "rowsPreFiltered", metrics.rowsPreFiltered); + appendMetric(sb, "keysFetched", metrics.keysFetched); + appendMetric(sb, "partitionsFetched", metrics.partitionsFetched); + appendMetric(sb, "partitionsReturned", metrics.partitionsReturned); + appendMetric(sb, "partitionTombstonesFetched", metrics.partitionTombstonesFetched); + appendMetric(sb, "rowsFetched", metrics.rowsFetched); + appendMetric(sb, "rowsReturned", metrics.rowsReturned); + appendMetric(sb, "rowTombstonesFetched", metrics.rowTombstonesFetched); appendMetric(sb, "trieSegmentsHit", metrics.trieSegmentsHit); appendMetric(sb, "bkdPostingListsHit", metrics.bkdPostingListsHit); appendMetric(sb, "bkdSegmentsHit", metrics.bkdSegmentsHit); @@ -83,7 +87,6 @@ public String toLogString(boolean unique) appendMetric(sb, "triePostingsSkips", metrics.triePostingsSkips); appendMetric(sb, "triePostingsDecodes", metrics.triePostingsDecodes); appendMetric(sb, "annGraphSearchLatencyNanos", metrics.annGraphSearchLatency); - appendMetric(sb, "shadowedPrimaryKeyCount", metrics.shadowedPrimaryKeyCount); // append the plan sb.append(sectionNamePrefix).append("plan:\n").append(plan); diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java index ac785ee104bf..615d04e8dc84 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java @@ -211,7 +211,7 @@ public boolean shouldEstimateInitialConcurrency() } @Override - public Index.Searcher searcherFor(ReadCommand command) + public StorageAttachedIndexSearcher searcherFor(ReadCommand command) { return new StorageAttachedIndexSearcher(cfs, queryMetrics, diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index ec8a1dbaa81a..b28f90d0d937 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -83,7 +83,6 @@ import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.btree.BTree; public class StorageAttachedIndexSearcher implements Index.Searcher @@ -126,6 +125,12 @@ public ReadCommand command() return command; } + @VisibleForTesting + public QueryContext queryContext() + { + return queryContext; + } + @VisibleForTesting public final Set plannedIndexes() { @@ -173,19 +178,25 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr Iterator keysIterator = controller.buildIterator(plan); // Can't check for `command.isTopK()` because the planner could optimize sorting out + UnfilteredPartitionIterator result; Orderer ordering = plan.ordering(); if (ordering == null) { assert keysIterator instanceof KeyRangeIterator; - return new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, executionController); + result = new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, executionController); } - - assert !(keysIterator instanceof KeyRangeIterator); - var scoredKeysIterator = (CloseableIterator) keysIterator; - var result = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, executionController, - command.limits().count(), - ordering.context.getDefinition()); - return new TopKProcessor(command).filter(result); + else + { + assert !(keysIterator instanceof KeyRangeIterator); + var scoredKeysIterator = (CloseableIterator) keysIterator; + var retriever = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller, + executionController, queryContext, + command.nowInSec(), + command.limits().count(), + ordering.context.getDefinition()); + result = new TopKProcessor(command).filter(retriever); + } + return CountReturnedTransformation.apply(result, queryContext, controller::finish); } catch (QueryView.Builder.MissingIndexException e) { @@ -270,6 +281,7 @@ private class ResultRetriever extends AbstractIterator im private final ReadExecutionController executionController; private final PrimaryKey.Factory keyFactory; private final int partitionRowBatchSize; + private final CountFetchedTransformation fetchedRowsCounter; private PrimaryKey lastKey; @@ -284,6 +296,7 @@ private ResultRetriever(KeyRangeIterator operation, this.filterTree = filterTree; this.executionController = executionController; this.keyFactory = controller.primaryKeyFactory(); + this.fetchedRowsCounter = new CountFetchedTransformation(queryContext, command.nowInSec()); this.firstPrimaryKey = controller.firstPrimaryKey(); @@ -540,14 +553,15 @@ public UnfilteredRowIterator apply(List keys) { long startTimeNanos = Clock.Global.nanoTime(); UnfilteredRowIterator partition = controller.getPartition(keys, executionController); - queryContext.addPartitionsRead(1); + UnfilteredRowIterator counted = fetchedRowsCounter.apply(partition); queryContext.checkpoint(); - UnfilteredRowIterator filtered = applyIndexFilter(partition, filterTree, queryContext); // Note that we record the duration of the read after post-filtering, which actually // materializes the rows from disk. queryContext.addPostFilteringReadLatency(Clock.Global.nanoTime() - startTimeNanos); - return filtered; + + queryContext.addKeysFetched(keys.size()); + return applyIndexFilter(counted, filterTree); } @Override @@ -560,7 +574,6 @@ public TableMetadata metadata() public void close() { FileUtils.closeQuietly(operation); - controller.finish(); if (tableQueryMetrics != null) tableQueryMetrics.record(queryContext, command); } @@ -584,6 +597,9 @@ public class ScoreOrderedResultRetriever extends AbstractIterator scoredPrimaryKeyIterator; private final FilterTree filterTree; private final ReadExecutionController executionController; + private final QueryContext queryContext; + private final long nowInSec; + private final CountFetchedTransformation fetchedRowsCounter; private final HashSet processedKeys; private final Queue pendingRows; @@ -601,7 +617,11 @@ public class ScoreOrderedResultRetriever extends AbstractIterator scoredPrimaryKeyIterator, FilterTree filterTree, - ReadExecutionController executionController, int limit, + QueryController controller, + ReadExecutionController executionController, + QueryContext queryContext, + long nowInSec, + int limit, ColumnMetadata orderedColumn) { IndexContext context = controller.getOrderer().context; @@ -612,6 +632,9 @@ private ScoreOrderedResultRetriever(CloseableIterator sco this.scoredPrimaryKeyIterator = scoredPrimaryKeyIterator; this.filterTree = filterTree; this.executionController = executionController; + this.queryContext = queryContext; + this.nowInSec = nowInSec; + this.fetchedRowsCounter = new CountFetchedTransformation(queryContext, nowInSec); this.processedKeys = new HashSet<>(limit); this.pendingRows = new ArrayDeque<>(limit); @@ -749,14 +772,15 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List BB.queryDelay.updateAndGet(x -> x / 4)); // restore the query delay // disable execution info logging and verify they are not logged @@ -353,6 +380,17 @@ public void testSlowSAIQueryLogger() throws Throwable assertLogsContain(mark, node, "was slow 2 times", "WHERE n = ?", "SAI slowest query metrics:"); assertLogsContain(mark, node, "was slow 3 times", "WHERE n > ?", "SAI slowest query metrics:"); assertLogsDoNotContain(mark, node, "WHERE n = 1", "WHERE n = 2", "WHERE n > 1", "WHERE n > 2", "WHERE n > 3"); + + // test some partition and row deletions + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 1 AND c = 1"), ConsistencyLevel.ONE); + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 1 AND c = 2"), ConsistencyLevel.ONE); + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 2"), ConsistencyLevel.ONE); + node.flush(KEYSPACE); + String selectAllQuery = withKeyspace("SELECT * FROM %s.t WHERE n >= 0"); + coordinator.execute(selectAllQuery, ConsistencyLevel.ONE); + assertLogsContain(mark, node, + "partitionTombstonesFetched: 1", + "rowTombstonesFetched: 2"); } } diff --git a/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java b/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java new file mode 100644 index 000000000000..68fbf17a03a5 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java @@ -0,0 +1,1100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.sai.disk.format.Version; +import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan; +import org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher; + +import static org.junit.Assert.assertEquals; + +public class QueryContextTest extends SAITester.Versioned +{ + @Test + public void testSkinnyTable() + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, a, b) VALUES (0, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (1, 1, 1)"); + execute("INSERT INTO %s (k, a, b) VALUES (2, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(1, 1, 1), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(4, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a < 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + snapshot = queryContext("SELECT * FROM %s WHERE a = 1 ALLOW FILTERING", + row(1, 1, 1), + row(3, 1, 1)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b = 1 ALLOW FILTERING"); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition/primary key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0)); + assertEquals(1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition/primary key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(3, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a partition/row + execute("DELETE FROM %s WHERE k = 1"); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(3, snapshot.partitionsFetched); + assertEquals(3, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(3, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete an indexed cell + execute("DELETE a FROM %s WHERE k = 2"); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(3, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(3, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(3, 1, 1)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, a, b) VALUES (0, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (1, 1, 1) USING TTL 1"); + execute("INSERT INTO %s (k, a, b) VALUES (2, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (3, 1, 1) USING TTL 1"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableWithoutStatics() + { + createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1), + row(1, 0, 0, 0), + row(1, 1, 1, 1), + row(1, 2, 0, 0), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a < 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + snapshot = queryContext("SELECT * FROM %s WHERE a = 1", + row(0, 1, 1, 1), + row(0, 3, 1, 1), + row(1, 1, 1, 1), + row(1, 3, 1, 1)); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b >= 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b < 0 ALLOW FILTERING"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1)); + assertEquals(isRowAware() ? 4 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // primary key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 AND c IN (0, 2) ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // clustering key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND c != 1 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(0, 3, 1, 1), + row(1, 0, 0, 0), + row(1, 2, 0, 0), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); // the clustering key filter is applied to indexed keys + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 7, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a cell + execute("DELETE a FROM %s WHERE k = 1 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 2, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 7, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 2 : 4, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition + execute("DELETE FROM %s WHERE k = 1 AND c = 0"); + execute("DELETE FROM %s WHERE k = 1 AND c = 2"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 0 : 2, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0", + row(1, 1, 1, 1), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(isRowAware() ? 0 : 2, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0) USING TTL 1"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 1, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableScoreOrdered() + { + Assume.assumeTrue(version.onOrAfter(Version.JVECTOR_EARLIEST)); + + createTable("CREATE TABLE %s (k int, c int, n int, v vector, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(n) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 3, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 3, 1, [1, 1])"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 0, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts limited rows + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 4", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 0, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0))); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + snapshot = queryContext("SELECT * FROM %s WHERE n = 1 ORDER BY v ANN OF [0, 0] LIMIT 10 ALLOW FILTERING", + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT * FROM %s WHERE k = 1 ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1))); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(7, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a cell + execute("DELETE v FROM %s WHERE k = 1 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + flush(); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(4, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition with a range tombstone + execute("DELETE FROM %s WHERE k = 1 AND c >= 1"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(8, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(4, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(6, snapshot.rowTombstonesFetched); // 3 index entries with start/end bounds each + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 0, 0, [0, 0]) USING TTL 1"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 3, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 1, 1, [1, 1]) USING TTL 1"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 3, 1, [1, 1])"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableWithStatics() + { + createTable("CREATE TABLE %s (k int, c int, a int, b int, s int static, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (0, 0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (1, 0, 0, 0, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, 1), + row(1, 1, 1, 1, 1), + row(1, 2, 0, 0, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a < 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some, different value + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 1", + row(0, 1, 1, 1, 0), + row(0, 3, 1, 1, 0), + row(1, 1, 1, 1, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0 AND b >= 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0 AND b < 0 ALLOW FILTERING"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 4 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // primary key query + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k = 0 AND c IN (0, 2) ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // clustering key filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND c != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND s != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row cell deletion + execute("DELETE s FROM %s WHERE k = 1"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row cell filtering using the deleted value + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND s = 1 ALLOW FILTERING"); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(7, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition with a range tombstone + execute("DELETE FROM %s WHERE k = 1 AND c >= 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0"); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(isRowAware() ? 8 : 2, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, a, b, s) VALUES (0, 0, 0, 0, 0) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (1, 0, 0, 0, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 1, 1, 1, null), + row(0, 2, 0, 0, null), + row(0, 3, 1, 1, null), + row(1, 0, 0, 0, 1), + row(1, 1, 1, 1, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testCollections() + { + createTable("CREATE TABLE %s (k int, c int, l list, s set, m map, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(l) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(s) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(keys(m)) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(values(m)) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(entries(m)) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (1, 1, [1, 2, 3], {1, 2, 3}, {1:10, 2:20, 3:30})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (1, 2, [2, 3, 4], {2, 3, 4}, {2:20, 3:30, 4:40})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (2, 1, [3, 4, 5], {3, 4, 5}, {3:30, 4:40, 5:50})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (2, 2, [5 ,6, 7], {5 ,6, 7}, {5:50, 6:60, 7:70})"); + flush(); + QueryContext.Snapshot snapshot; + + List queries = Arrays.asList("SELECT k, c FROM %s WHERE l CONTAINS 3", + "SELECT k, c FROM %s WHERE s CONTAINS 3", + "SELECT k, c FROM %s WHERE m CONTAINS KEY 3", + "SELECT k, c FROM %s WHERE m CONTAINS 30", + "SELECT k, c FROM %s WHERE m[3] = 30"); + + for (String query : queries) + { + snapshot = queryContext(query, row(1, 1), row(1, 2), row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + } + + // delete a cell + execute("UPDATE %s SET l = l - [3], s = s - {3}, m = m - {3} WHERE k = 1 AND c = 1"); + for (String query : queries) + { + snapshot = queryContext(query, row(1, 2), row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 4, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + } + + // delete a row + execute("DELETE FROM %s WHERE k = 1 AND c = 2"); + for (String query : queries) + { + snapshot = queryContext(query, row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 2 : 3, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + } + + // delete a partition + execute("DELETE FROM %s WHERE k = 2"); + for (String query : queries) + { + snapshot = queryContext(query); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + } + } + + private boolean isRowAware() + { + return version.after(Version.AA); + } + + private QueryContext.Snapshot queryContext(String query, Object[]... rows) + { + // First execute the query with the normal test path to validate the results + assertRowsIgnoringOrder(execute(query), rows); + + // Get an index searcher for the query + PartitionRangeReadCommand command = (PartitionRangeReadCommand) parseReadCommand(query); + StorageAttachedIndexQueryPlan plan = (StorageAttachedIndexQueryPlan) command.indexQueryPlan(); + Assert.assertNotNull(plan); + StorageAttachedIndexSearcher searcher = plan.searcherFor(command); + + // Execute the search for the query and consume the results to popupate the query context + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator partitions = searcher.search(executionController)) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator partition = partitions.next()) + { + while (partition.hasNext()) + { + partition.next(); + } + } + } + } + + // Return the query context snapshot, which should be populated + return searcher.queryContext().snapshot(); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java b/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java index 835b38abc8f0..3b87e6a2d7fb 100644 --- a/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java +++ b/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java @@ -241,7 +241,7 @@ public void testKDTreeQueryMetricsWithSingleIndex() waitForGreaterThanZero(objectNameNoIndex("QueryLatency", keyspace, table, PER_QUERY_METRIC_TYPE)); - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); waitForEquals(objectName("KDTreeIntersectionLatency", keyspace, table, index, GLOBAL_METRIC_TYPE), queryCounter); } @@ -358,7 +358,7 @@ public void testInvertedIndexQueryMetricsWithSingleIndex() waitForGreaterThanZero(objectNameNoIndex("QueryLatency", keyspace, table, PER_QUERY_METRIC_TYPE)); - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); } @Test @@ -387,9 +387,9 @@ public void testKDTreePartitionsReadAndRowsFiltered() assertEquals(3, actualRows); // This is 2 due to partition read batching. - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), 2); - waitForHistogramCountEquals(objectNameNoIndex("RowsFiltered", keyspace, table, PER_QUERY_METRIC_TYPE), 1); - waitForEquals(objectNameNoIndex("TotalRowsFiltered", keyspace, table, TABLE_QUERY_METRIC_TYPE), 3); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), 2); + waitForHistogramCountEquals(objectNameNoIndex("RowsFetched", keyspace, table, PER_QUERY_METRIC_TYPE), 1); + waitForEquals(objectNameNoIndex("TotalRowsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), 3); } @Test @@ -497,6 +497,14 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) for (int c = 0; c < numRowsPerPartition; c++) execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", k, c); + // add a partition tombstone + execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", numPartitions, numRowsPerPartition); + execute("DELETE FROM %s WHERE k = ?", numPartitions); + + // add a row range tombstone + execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", numPartitions + 1, numRowsPerPartition); + execute("DELETE FROM %s WHERE k = ? AND c > 0", numPartitions + 1); + // filter query (goes to the general, filter and range query metrics) UntypedResultSet rows = execute("SELECT k, c FROM %s WHERE n = 1"); assertEquals(numRows, rows.size()); @@ -531,18 +539,58 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); - // Verify counters for total partition reads. - name = "TotalPartitionReads"; - waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + numRowsPerPartition + numRows + numRowsPerPartition + numRows); + // Verify counters for total keys fetched. + name = "TotalKeysFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 3L * (numRowsPerPartition + numRows + 2)); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows + 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows + 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows + 2); + + // Verify counters for total partitions fetched. + name = "TotalPartitionsFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + 1 + numRowsPerPartition + numRows + 1 + numRowsPerPartition + numRows + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); // single-partition top-k issues a partition access per each row - waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); // single-partition top-k issues a partition access per each row + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows + 1); + + // Verify counters for total partitions returned. + name = "TotalPartitionsReturned"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + 1 + numPartitions + 1 + numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numPartitions); + + // Verify counters for total partition tombstones fetched. + name = "TotalPartitionTombstonesFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 3); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); + + // Verify counters for total rows fetched. + name = "TotalRowsFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), numRowsPerPartition + numRows + numRowsPerPartition + numRows + numRowsPerPartition + numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows); - // Verify counters for total rows filtered. - name = "TotalRowsFiltered"; + // Verify counters for total rows returned. + name = "TotalRowsReturned"; waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), numRowsPerPartition + numRows + numRowsPerPartition + numRows + numRowsPerPartition + numRows); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows); @@ -551,6 +599,16 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows); + // Verify counters for total row tombstones fetched. + name = "TotalRowTombstonesFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 6); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 2); + // Verify counters for timeouts. name = "TotalQueryTimeouts"; waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 0); @@ -576,25 +634,25 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); - // Verify histograms for partitions reads per query. - name = "PartitionReads"; - waitForHistogramCountEquals(objectName(name, PER_QUERY_METRIC_TYPE), 6); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); - - // Verify histograms for rows filtered per query. - name = "RowsFiltered"; + // Verify histograms + verifyHistogramCount("KeysFetched", perQuery); + verifyHistogramCount("PartitionsFetched", perQuery); + verifyHistogramCount("PartitionsReturned", perQuery); + verifyHistogramCount("PartitionTombstonesFetched", perQuery); + verifyHistogramCount("RowsFetched", perQuery); + verifyHistogramCount("RowsReturned", perQuery); + verifyHistogramCount("RowTombstonesFetched", perQuery); + } + + private void verifyHistogramCount(String name, boolean hasPerQueryKindMetrics) + { waitForHistogramCountEquals(objectName(name, PER_QUERY_METRIC_TYPE), 6); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); } private ObjectName objectName(String name, String type)