diff --git a/src/java/org/apache/cassandra/index/IndexNotAvailableException.java b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java index 41c24a86aa44..c8c72411bbad 100644 --- a/src/java/org/apache/cassandra/index/IndexNotAvailableException.java +++ b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java @@ -31,8 +31,17 @@ public final class IndexNotAvailableException extends UncheckedInternalRequestEx * @param index the index */ public IndexNotAvailableException(Index index) + { + this(index.getIndexMetadata().name); + } + + /** + * Creates a new IndexNotAvailableException for the specified index. + * @param indexName the index name + */ + public IndexNotAvailableException(String indexName) { super(RequestFailureReason.INDEX_NOT_AVAILABLE, - String.format("The secondary index '%s' is not yet available", index.getIndexMetadata().name)); + String.format("The secondary index '%s' is not yet available", indexName)); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index c42178d8aec1..1199482e7dc9 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -509,7 +509,9 @@ static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, public static PqInfo getPqIfPresent(IndexContext indexContext, Function matcher) { // Retrieve the first compressed vectors for a segment with at least MAX_PQ_TRAINING_SET_SIZE rows - // or the one with the most rows if none reach that size + // or the one with the most rows if none reach that size. Can safely ignore whether the view is queryable + // because we're just getting the PQ, and if one is loaded, we can assume it was validated (at least as much + // as we validate any other index component for reads). var view = indexContext.getReferencedView(TimeUnit.SECONDS.toNanos(5)); if (view == null) { diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java index 46b4d2205280..d23001c16ff5 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryView.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryView.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.index.IndexNotAvailableException; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.SSTableIndex; @@ -143,6 +144,9 @@ protected QueryView build() throws MissingIndexException if (!indexContext.isIndexed()) throw new MissingIndexException(indexContext); + if (!saiView.isQueryable()) + throw new IndexNotAvailableException(indexContext.getIndexName()); + var sstableReaders = new ArrayList(saiView.size()); // These are already referenced because they are referenced by the same view we just referenced. // TODO review saiView.match() method for boolean predicates. diff --git a/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java b/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java index 02a037f0d392..a98e639b99c4 100644 --- a/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java +++ b/src/java/org/apache/cassandra/index/sai/view/IndexViewManager.java @@ -61,7 +61,9 @@ public IndexViewManager(IndexContext context) IndexViewManager(IndexContext context, Collection indices) { this.context = context; - this.viewRef.set(new View(context, indices)); + // A view starts our as non-queryable because C* subsequently creates this IndexViewManager before completing + // the index build. Once the build is done, it replaces the view with a queryable one. + this.viewRef.set(new View(context, indices, false)); } public View getView() @@ -128,7 +130,7 @@ public Set update(Collection oldSSTables, referencedSSTableIndexes.add(sstableIndex); } - newView = new View(context, referencedSSTableIndexes); + newView = new View(context, referencedSSTableIndexes, indexes.right.isEmpty()); } while (newView == null || !viewRef.compareAndSet(currentView, newView)); @@ -177,6 +179,8 @@ public void prepareSSTablesForRebuild(Collection sstablesToRebuil if (iterations++ > 1000) throw new IllegalStateException("Failed to prepare index view after 1000 iterations"); + // Iff we keep all the indexes, then we can stay queryable. + boolean retainedAllIndexes = true; for (var index : oldView.getIndexes()) { if (!toRemove.contains(index.getSSTable())) @@ -185,9 +189,13 @@ public void prepareSSTablesForRebuild(Collection sstablesToRebuil continue outer; newIndexes.add(index); } + else + { + retainedAllIndexes = false; + } } - newView = new View(context, newIndexes); + newView = new View(context, newIndexes, retainedAllIndexes); } while (newView == null || !viewRef.compareAndSet(oldView, newView)); oldView.release(); @@ -203,7 +211,7 @@ public void prepareSSTablesForRebuild(Collection sstablesToRebuil public void invalidate(boolean indexWasDropped) { // No need to loop here because we don't use the old view when building the new view. - var oldView = viewRef.getAndSet(new View(context, Collections.emptySet())); + var oldView = viewRef.getAndSet(new View(context, Collections.emptySet(), false)); if (indexWasDropped) oldView.markIndexWasDropped(); else diff --git a/src/java/org/apache/cassandra/index/sai/view/View.java b/src/java/org/apache/cassandra/index/sai/view/View.java index e25c37df07c7..2cee09c2746d 100644 --- a/src/java/org/apache/cassandra/index/sai/view/View.java +++ b/src/java/org/apache/cassandra/index/sai/view/View.java @@ -47,6 +47,7 @@ public class View implements Iterable { private final Map view; + private final boolean isQueryable; private final AtomicInteger references = new AtomicInteger(1); private volatile boolean indexWasDropped; @@ -60,9 +61,11 @@ public class View implements Iterable * @param indexes the indexes. Note that the referencing logic for these indexes is handled * outside of this constructor and all indexes are assumed to have been referenced already. * The view will release the indexes when it is finally released. + * @param isQueryable true if the view is queryable; false otherwise. */ - public View(IndexContext context, Collection indexes) + public View(IndexContext context, Collection indexes, boolean isQueryable) { + this.isQueryable = isQueryable; this.view = new HashMap<>(); this.keyValidator = context.keyValidator(); @@ -151,6 +154,11 @@ public int size() return view.size(); } + public boolean isQueryable() + { + return isQueryable; + } + /** * Tells if an index for the given sstable exists. * It's equivalent to {@code getSSTableIndex(descriptor) != null }. diff --git a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java index 44bbddc41923..63f0e6c812a9 100644 --- a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java +++ b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java @@ -189,18 +189,26 @@ public void testIndexRebuildWhenAddingSStableViaRemoteReload() // unlink sstable and index context: expect no rows to be read by base and index cfs.clearUnsafe(); IndexMetadata indexMetadata = cfs.metadata().indexes.iterator().next(); - ((StorageAttachedIndex) cfs.getIndexManager().getIndex(indexMetadata)).getIndexContext().prepareSSTablesForRebuild(sstables); + // First confirm that if we pass an empty set, the index stays queryable. Not sure if this is a realistic + // code path, but it seems plausible that we could have an unrelated sstable passed to this method in the + // event of a data race, and in that case, we should stay queryable. + ((StorageAttachedIndex) cfs.getIndexManager().getIndex(indexMetadata)).getIndexContext().prepareSSTablesForRebuild(Collections.emptySet()); assertEmpty(execute("SELECT * FROM %s WHERE a=1")); assertEmpty(execute("SELECT * FROM %s WHERE c=1")); - // TODO why? This change reverts back to behavior from before https://github.com/datastax/cassandra/pull/1491, - // but it seems invalid. - // track sstable again: expect no rows to be read by index + // Now pass the actual sstables: expect no rows to be read by base and for the index to be non-queryable because + // preparing an sstable for rebuild removes the index from the view, which would otherwise result in + // partial results. + ((StorageAttachedIndex) cfs.getIndexManager().getIndex(indexMetadata)).getIndexContext().prepareSSTablesForRebuild(sstables); + assertEmpty(execute("SELECT * FROM %s WHERE a=1")); + assertInvalid("SELECT * FROM %s WHERE c=1"); + + // track sstable again: expect index to remain non-queryable because it is not rebuilt (the view hasn't been updated) cfs.getTracker().addInitialSSTables(sstables); assertRows(execute("SELECT * FROM %s WHERE a=1"), row(1, 1, 1)); - assertEmpty(execute("SELECT * FROM %s WHERE c=1")); + assertInvalid("SELECT * FROM %s WHERE c=1"); - // remote reload should trigger index rebuild + // remote reload should trigger index rebuild, making the index queryable again cfs.getTracker().notifySSTablesChanged(Collections.emptySet(), sstables, OperationType.REMOTE_RELOAD, Optional.empty(), null); waitForIndexBuilds(KEYSPACE, indexName); // this is needed because index build on remote reload is async assertRows(execute("SELECT * FROM %s WHERE a=1"), row(1, 1, 1)); diff --git a/test/unit/org/apache/cassandra/index/sai/plan/FailedSSTableIndexLoadWhileQueryingTest.java b/test/unit/org/apache/cassandra/index/sai/plan/FailedSSTableIndexLoadWhileQueryingTest.java new file mode 100644 index 000000000000..9bfdcdcf1931 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/plan/FailedSSTableIndexLoadWhileQueryingTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.index.sai.SAITester; +import org.apache.cassandra.index.sai.view.IndexViewManager; +import org.apache.cassandra.inject.ActionBuilder; +import org.apache.cassandra.inject.Expression; +import org.apache.cassandra.inject.Injection; +import org.apache.cassandra.inject.Injections; +import org.apache.cassandra.inject.InvokePointBuilder; +import org.awaitility.Awaitility; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * When a query is in progress and an index build fails, we need to mark the index as non-queryable instead of + * returning partial or incorrect results. + */ +public class FailedSSTableIndexLoadWhileQueryingTest extends SAITester +{ + + @Test + public void testSSTableIndexInitFailsAfterQueryViewBuildEqualityQuery() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, x int)"); + + var indexName = createIndex("CREATE CUSTOM INDEX ON %s(x) USING 'StorageAttachedIndex'"); + + execute("INSERT INTO %s (k, x) VALUES (?, ?)", "a", 0); + execute("INSERT INTO %s (k, x) VALUES (?, ?)", "b", 0); + execute("INSERT INTO %s (k, x) VALUES (?, ?)", "c", 1); + + testSSTableIndexInitFailsAfterQueryViewBuiltBeforeQueryExecution(indexName, "SELECT k FROM %s WHERE x = 0"); + } + + @Test + public void testSSTableIndexInitFailsAfterQueryViewBuildNotContainsQuery() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, x set)"); + + var indexName = createIndex("CREATE CUSTOM INDEX ON %s(x) USING 'StorageAttachedIndex'"); + + execute("INSERT INTO %s (k, x) VALUES ('a', {1, 2, 3})"); + execute("INSERT INTO %s (k, x) VALUES ('b', {1, 2, 3})"); + execute("INSERT INTO %s (k, x) VALUES ('c', {1, 2, 4})"); + + testSSTableIndexInitFailsAfterQueryViewBuiltBeforeQueryExecution(indexName, "SELECT k FROM %s WHERE x NOT CONTAINS 3"); + } + + private void testSSTableIndexInitFailsAfterQueryViewBuiltBeforeQueryExecution(String indexName, String query) throws Throwable + { + // This bug is only reachable when you fail to replace an sstable via compaction. + flush(); + + Injection failSSTableIndexLoadOnInit = Injections.newCustom("FailSSTableIndexLoadOnInit-" + indexName) + .add(InvokePointBuilder.newInvokePoint() + .onClass("org.apache.cassandra.index.sai.SSTableIndex") + .onMethod("") + .atEntry() + ) + .add(ActionBuilder.newActionBuilder().actions() + .doThrow(java.lang.RuntimeException.class, Expression.quote("Byteman-injected fault in MemtableIndexWriter.complete")) + ) + .build(); + Injections.inject(failSSTableIndexLoadOnInit); + + // We use two barriers to ensure that we first flush and fail to load the index (thereby putting in place + // an invalid view) and then to make sure the query gets that view. + var badViewPoint = InvokePointBuilder.newInvokePoint().onClass(IndexViewManager.class).onMethod("update").atExit(); + var badViewBarrier = Injections.newBarrier("pause_after_setting_bad_view", 2, false).add(badViewPoint).build(); + Injections.inject(badViewBarrier); + + // Flush in a separate thread since the badViewPointBarrier will block it, thereby preventing it from + // marking the index as non-queryable. + ForkJoinPool.commonPool().submit(() -> compact()); + + // Wait for compaction to reach the badViewBarrier (the point where we know the view is bad but the + // index is still considered queryable). + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> badViewBarrier.getCount() == 1); + + // The query hits the getViewBarrier. + assertThrows(ReadFailureException.class, () -> execute(query)); + assertEquals("Confirm that flush hit the barrier, but did not pass it", 1, badViewBarrier.getCount()); + + // Confirm index is considered queryable. The primary point of the remaining assertions is to show to readers + // that we have a period of time after creating a broken view and before marking the index as non-queryable. + assertTrue(isIndexQueryable(KEYSPACE, indexName)); + // Arrive and unblock the thread that will mark the index as non-queryable. + badViewBarrier.arrive(); + // Expect index to go non-queryable soon. + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> !isIndexQueryable(KEYSPACE, indexName)); + } +}