Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,32 @@ protected KeyRangeIterator toPrimaryKeyIterator(PostingList postingList, QueryCo

protected CloseableIterator<PrimaryKeyWithSortKey> toMetaSortedIterator(CloseableIterator<? extends RowIdWithMeta> rowIdIterator, QueryContext queryContext) throws IOException
{
if (rowIdIterator == null || !rowIdIterator.hasNext())
try
{
if (rowIdIterator == null || !rowIdIterator.hasNext())
{
FileUtils.closeQuietly(rowIdIterator);
return CloseableIterator.emptyIterator();
}

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
queryContext,
null);
var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap();
return new RowIdToPrimaryKeyWithSortKeyIterator(indexContext,
pkm.getSSTableId(),
rowIdIterator,
pkm,
searcherContext);
}
catch (Throwable t)
{
FileUtils.closeQuietly(rowIdIterator);
return CloseableIterator.emptyIterator();
throw t;
}

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
queryContext,
null);
var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap();
return new RowIdToPrimaryKeyWithSortKeyIterator(indexContext,
pkm.getSSTableId(),
rowIdIterator,
pkm,
searcherContext);
}
}
64 changes: 64 additions & 0 deletions test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig;
import org.apache.cassandra.index.sai.disk.v1.SegmentBuilder;
import org.apache.cassandra.index.sai.disk.vector.AutoResumingNodeScoreIterator;
import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph;
import org.apache.cassandra.index.sai.disk.vector.VectorSourceModel;
import org.apache.cassandra.index.sai.plan.QueryController;
import org.apache.cassandra.inject.ActionBuilder;
import org.apache.cassandra.inject.Expression;
import org.apache.cassandra.inject.Injection;
import org.apache.cassandra.inject.Injections;
import org.apache.cassandra.inject.InvokePointBuilder;
import org.apache.cassandra.service.ClientState;
Expand Down Expand Up @@ -1057,4 +1059,66 @@ public void testUpdateRowScoreToWorsePositionButIncludeInBatch()
// query with ANN only
assertRows(execute("SELECT c FROM %s ORDER BY r ANN OF [0.1, 0.1] LIMIT 10"), row(2), row(1));
}

@Test
public void testRowIdIteratorClosedOnHasNextFailure() throws Throwable
{
createTable("CREATE TABLE %s (pk int, vec vector<float, 2>, PRIMARY KEY(pk))");
createIndex("CREATE CUSTOM INDEX ON %s(vec) USING 'StorageAttachedIndex'");

// Track if the rowIdIterator's close method is called
Injections.Counter closeCounter = Injections.newCounter("rowIdIteratorCloseCounter")
.add(InvokePointBuilder.newInvokePoint()
.onClass(AutoResumingNodeScoreIterator.class)
.onMethod("close"))
.build();

// Inject failure at hasNext in toMetaSortedIterator
Injection hasNextFailure = Injections.newCustom("fail_on_hasNext")
.add(InvokePointBuilder.newInvokePoint()
.onClass(AutoResumingNodeScoreIterator.class)
.onMethod("computeNext")
.atEntry())
.add(ActionBuilder.newActionBuilder()
.actions()
.doThrow(RuntimeException.class, Expression.quote("Injected hasNext failure!")))
.build();

try
{
Injections.inject(closeCounter);
Injections.inject(hasNextFailure);

// Insert data
execute("INSERT INTO %s (pk, vec) VALUES (1, [1.0, 1.0])");
execute("INSERT INTO %s (pk, vec) VALUES (2, [2.0, 2.0])");
flush();

// Reset counter before the test
closeCounter.reset();

// Enable the failure injection
hasNextFailure.enable();

// Execute query that will trigger toMetaSortedIterator and fail at hasNext
assertThatThrownBy(() -> executeInternal("SELECT pk FROM %s ORDER BY vec ANN OF [1.5, 1.5] LIMIT 2"))
.hasMessageContaining("Injected hasNext failure!");

// Verify that close was called on the rowIdIterator despite the failure
// The close should be called in the catch block of toMetaSortedIterator (line 198)
assertThat(closeCounter.get()).as("rowIdIterator should be closed when hasNext fails")
.isGreaterThan(0);

// Remove failure and confirm we can still query
hasNextFailure.disable();

// Confrm subsequent queries succeed because we close the iterator and release the graph searcher
execute("SELECT pk FROM %s ORDER BY vec ANN OF [1.5, 1.5] LIMIT 2");
}
finally
{
hasNextFailure.disable();
closeCounter.disable();
}
}
}
Loading