Skip to content
57 changes: 54 additions & 3 deletions src/java/org/apache/cassandra/index/sai/SSTableContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.cassandra.index.sai;

import java.io.IOException;

import com.google.common.base.Objects;

import org.apache.cassandra.config.CassandraRelevantProperties;
Expand All @@ -43,17 +45,49 @@ public class SSTableContext extends SharedCloseableImpl
public final PrimaryKey.Factory primaryKeyFactory;
public final PrimaryKeyMap.Factory primaryKeyMapFactory;

// The first and last key for the whole sstable
private final PrimaryKey minSSTableKey;
private final PrimaryKey maxSSTableKey;

private SSTableContext(SSTableReader sstable,
IndexComponents.ForRead perSSTableComponents,
PrimaryKey.Factory primaryKeyFactory,
PrimaryKeyMap.Factory primaryKeyMapFactory,
Cleanup cleanup)
boolean skipLoadingMinMaxKeys,
Cleanup cleanup) throws IOException
{
super(cleanup);
this.sstable = sstable;
this.perSSTableComponents = perSSTableComponents;
this.primaryKeyFactory = primaryKeyFactory;
this.primaryKeyMapFactory = primaryKeyMapFactory;

// If the min/max keys are null but then subsequently attempted to be accessed, we throw an exception
// as we do not expect this to happen.
if (skipLoadingMinMaxKeys)
{
minSSTableKey = null;
maxSSTableKey = null;
}
else
{
// If we throw, the caller releases the sstable ref and runs the cleanup.
try (var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap())
{
if (pkm.count() == 0)
{
minSSTableKey = null;
maxSSTableKey = null;
}
else
{
PrimaryKey min = pkm.primaryKeyFromRowId(0);
PrimaryKey max = pkm.primaryKeyFromRowId(pkm.count() - 1);
minSSTableKey = pkm.primaryKeyFromRowId(0, min, max).loadDeferred();
maxSSTableKey = pkm.primaryKeyFromRowId(pkm.count() - 1, min, max).loadDeferred();
}
}
}
}

private SSTableContext(SSTableContext copy)
Expand All @@ -63,6 +97,8 @@ private SSTableContext(SSTableContext copy)
this.perSSTableComponents = copy.perSSTableComponents;
this.primaryKeyFactory = copy.primaryKeyFactory;
this.primaryKeyMapFactory = copy.primaryKeyMapFactory;
this.minSSTableKey = copy.minSSTableKey;
this.maxSSTableKey = copy.maxSSTableKey;
}

@SuppressWarnings("resource")
Expand All @@ -85,13 +121,14 @@ public static SSTableContext create(SSTableReader sstable, IndexComponents.ForRe
}

// avoid opening SAI metadata if reads are disabled
primaryKeyMapFactory = CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean()
boolean readsDisabled = CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean();
primaryKeyMapFactory = readsDisabled
? new PrimaryKeyMap.DummyThrowingFactory()
: onDiskFormat.newPrimaryKeyMapFactory(perSSTableComponents, primaryKeyFactory, sstable);

Cleanup cleanup = new Cleanup(primaryKeyMapFactory, sstableRef);

return new SSTableContext(sstable, perSSTableComponents, primaryKeyFactory, primaryKeyMapFactory, cleanup);
return new SSTableContext(sstable, perSSTableComponents, primaryKeyFactory, primaryKeyMapFactory, readsDisabled, cleanup);
}
catch (Throwable t)
{
Expand Down Expand Up @@ -141,6 +178,20 @@ public PrimaryKeyMap.Factory primaryKeyMapFactory()
return primaryKeyMapFactory;
}

public PrimaryKey minSSTableKey()
{
if (minSSTableKey == null)
throw new IllegalStateException("minSSTableKey is null");
return minSSTableKey;
}

public PrimaryKey maxSSTableKey()
{
if (maxSSTableKey == null)
throw new IllegalStateException("maxSSTableKey is null");
return maxSSTableKey;
}

/**
* @return number of open files per {@link SSTableContext} instance
*/
Expand Down
13 changes: 3 additions & 10 deletions src/java/org/apache/cassandra/index/sai/SSTableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte
if (CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean())
{
logger.info("Creating dummy (empty) index searcher for sstable {} as SAI index reads are disabled", sstableContext.sstable.descriptor);
return new EmptyIndex();
return new EmptyIndex(sstableContext);
}

return perIndexComponents.onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents);
Expand Down Expand Up @@ -252,17 +252,10 @@ private KeyRangeIterator getNonEqIterator(Expression expression,
QueryContext context,
boolean defer) throws IOException
{
KeyRangeIterator allKeys = allSSTableKeys(keyRange);
if (TypeUtil.supportsRounding(expression.validator))
{
return allKeys;
}
return allSSTableKeys(keyRange);
else
{
Expression negExpression = expression.negated();
KeyRangeIterator matchedKeys = searchableIndex.search(negExpression, keyRange, context, defer);
return KeyRangeAntiJoinIterator.create(allKeys, matchedKeys);
}
return searchableIndex.search(expression, keyRange, context, defer);
}

public KeyRangeIterator search(Expression expression,
Expand Down
12 changes: 11 additions & 1 deletion src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.db.virtual.SimpleDataSet;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.v1.Segment;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.plan.Expression;
Expand All @@ -38,6 +39,13 @@

public class EmptyIndex implements SearchableIndex
{
private final SSTableContext sstableContext;

public EmptyIndex(SSTableContext sstableContext)
{
this.sstableContext = sstableContext;
}

@Override
public long indexFileCacheSize()
{
Expand Down Expand Up @@ -98,7 +106,9 @@ public KeyRangeIterator search(Expression expression,
QueryContext context,
boolean defer) throws IOException
{
return KeyRangeIterator.empty();
return expression.getOp().isNonEquality()
? PrimaryKeyMapIterator.create(sstableContext, keyRange)
: KeyRangeIterator.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
import org.apache.cassandra.index.sai.utils.PrimaryKey;

public class IndexSearcherContext
Expand All @@ -30,15 +31,22 @@ public class IndexSearcherContext

final PrimaryKey minimumKey;
final PrimaryKey maximumKey;
final long minSSTableRowId;
final long maxSSTableRowId;
final long segmentRowIdOffset;
final long maxPartitionOffset;

public IndexSearcherContext(SegmentMetadata metadata,
QueryContext context,
PostingList postingList) throws IOException
{
this(metadata.minKey,
metadata.maxKey,
metadata.segmentRowIdOffset,
context,
postingList);
}

public IndexSearcherContext(PrimaryKey minimumKey,
PrimaryKey maximumKey,
long minSSTableRowId,
long maxSSTableRowId,
long segmentRowIdOffset,
QueryContext context,
PostingList postingList) throws IOException
Expand All @@ -53,8 +61,6 @@ public IndexSearcherContext(PrimaryKey minimumKey,
// use segment's metadata for the range iterator, may not be accurate, but should not matter to performance.
this.maximumKey = maximumKey;

this.minSSTableRowId = minSSTableRowId;
this.maxSSTableRowId = maxSSTableRowId;
this.maxPartitionOffset = Long.MAX_VALUE;
}

Expand Down
80 changes: 57 additions & 23 deletions src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.IndexSearcherContext;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PostingListKeyRangeIterator;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.v1.postings.ComplementPostingList;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.iterators.RowIdToPrimaryKeyWithSortKeyIterator;
import org.apache.cassandra.index.sai.plan.Expression;
Expand All @@ -63,23 +65,43 @@
*/
public abstract class IndexSearcher implements Closeable, SegmentOrdering
{
private final SSTableContext sstableContext;
protected final PrimaryKeyMap.Factory primaryKeyMapFactory;
final PerIndexFiles indexFiles;
protected final SegmentMetadata metadata;
protected final IndexContext indexContext;

protected final ColumnFilter columnFilter;

protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
// These row ids are inclusive of all rows in the sstable, not just rows that are indexed. They are used
// to determine the first and last row ids for a non-equality query.
private final boolean isFirstSegment;
private final boolean isLastSegment;
private final int firstSegmentRowId;
private final int lastSegmentRowId;

protected IndexSearcher(SSTableContext sstableContext,
PerIndexFiles perIndexFiles,
SegmentMetadata segmentMetadata,
IndexContext indexContext)
{
this.primaryKeyMapFactory = primaryKeyMapFactory;
this.sstableContext = sstableContext;
this.primaryKeyMapFactory = sstableContext.primaryKeyMapFactory();
this.indexFiles = perIndexFiles;
this.metadata = segmentMetadata;
this.indexContext = indexContext;
columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(indexContext.getDefinition()));

// TODO how do we deal with version AA here?? Is NEQ supported on AA? It seems like it can't be, but I haven't
// been able to confirm the behavior.

// The first segment's minimum row id and the last segment's maximum row id need to be adjusted when searching
// for non-equality queries because the SegmentMetadata's min/max row ids do not include rows that do not have
// values for the indexed column.
this.isFirstSegment = metadata.segmentRowIdOffset == 0;
this.isLastSegment = metadata.isLastSegmentInSSTable;
this.firstSegmentRowId = metadata.toSegmentRowId(isFirstSegment ? 0 : metadata.minSSTableRowId);
this.lastSegmentRowId = metadata.toSegmentRowId(isLastSegment ? primaryKeyMapFactory.count() - 1 : metadata.minSSTableRowId);
}

/**
Expand All @@ -96,7 +118,38 @@ protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
* @param defer create the iterator in a deferred state
* @return {@link KeyRangeIterator} that matches given expression
*/
public abstract KeyRangeIterator search(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException;
public KeyRangeIterator search(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException
{
IndexSearcherContext searcherContext;
if (expression.getOp().isNonEquality())
{
var negatedPostingList = searchInternal(expression.negated(), keyRange, queryContext, defer);
var postingList = new ComplementPostingList(firstSegmentRowId, lastSegmentRowId, negatedPostingList);
if (postingList.isEmpty())
{
postingList.close(); // Closing the posting list also closes the negated posting list.
return KeyRangeIterator.empty();
}

// Use the appropriate min and max keys for the first and last segments.
searcherContext = new IndexSearcherContext(isFirstSegment ? sstableContext.minSSTableKey() : metadata.minKey,
isLastSegment ? sstableContext.maxSSTableKey() : metadata.maxKey,
metadata.segmentRowIdOffset,
queryContext,
postingList);
}
else
{
var postingList = searchInternal(expression, keyRange, queryContext, defer);
if (postingList == null || postingList.isEmpty())
return KeyRangeIterator.empty();

searcherContext = new IndexSearcherContext(metadata, queryContext, postingList);
}
return new PostingListKeyRangeIterator(indexContext, primaryKeyMapFactory.newPerSSTablePrimaryKeyMap(), searcherContext);
}

protected abstract PostingList searchInternal(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException;

/**
* Order the rows by the given Orderer. Used for ORDER BY clause when
Expand Down Expand Up @@ -154,21 +207,6 @@ private ByteComparable encode(ByteBuffer input)
: v -> TypeUtil.asComparableBytes(input, indexContext.getValidator(), v);
}

protected KeyRangeIterator toPrimaryKeyIterator(PostingList postingList, QueryContext queryContext) throws IOException
{
if (postingList == null || postingList.size() == 0)
return KeyRangeIterator.empty();

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
queryContext,
postingList);
return new PostingListKeyRangeIterator(indexContext, primaryKeyMapFactory.newPerSSTablePrimaryKeyMap(), searcherContext);
}

protected CloseableIterator<PrimaryKeyWithSortKey> toMetaSortedIterator(CloseableIterator<? extends RowIdWithMeta> rowIdIterator, QueryContext queryContext) throws IOException
{
if (rowIdIterator == null || !rowIdIterator.hasNext())
Expand All @@ -177,11 +215,7 @@ protected CloseableIterator<PrimaryKeyWithSortKey> toMetaSortedIterator(Closeabl
return CloseableIterator.emptyIterator();
}

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
IndexSearcherContext searcherContext = new IndexSearcherContext(metadata,
queryContext,
null);
var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.v1.postings.IntersectingPostingList;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.metrics.MulticastQueryEventListeners;
import org.apache.cassandra.index.sai.metrics.QueryEventListener;
import org.apache.cassandra.index.sai.plan.Expression;
Expand Down Expand Up @@ -98,7 +97,7 @@ protected InvertedIndexSearcher(SSTableContext sstableContext,
Version version,
boolean filterRangeResults) throws IOException
{
super(sstableContext.primaryKeyMapFactory(), perIndexFiles, segmentMetadata, indexContext);
super(sstableContext, perIndexFiles, segmentMetadata, indexContext);
this.sstable = sstableContext.sstable;

long root = metadata.getIndexRoot(IndexComponentType.TERMS_DATA);
Expand Down Expand Up @@ -134,13 +133,7 @@ public long indexFileCacheSize()
}

@SuppressWarnings("resource")
public KeyRangeIterator search(Expression exp, AbstractBounds<PartitionPosition> keyRange, QueryContext context, boolean defer) throws IOException
{
PostingList postingList = searchPosting(exp, context);
return toPrimaryKeyIterator(postingList, context);
}

private PostingList searchPosting(Expression exp, QueryContext context)
protected PostingList searchInternal(Expression exp, AbstractBounds<PartitionPosition> keyRange, QueryContext context, boolean defer) throws IOException
{
if (logger.isTraceEnabled())
logger.trace(indexContext.logMessage("Searching on expression '{}'..."), exp);
Expand Down
Loading