Skip to content
Open
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Future version (tbd)
* Reduce size of per-SSTable index components for SAI (CASSANDRA-18673)
* Require only MODIFY permission on base when updating table with MV (STAR-564)
Merged from 5.1:
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,16 @@ public enum CassandraRelevantProperties
/** Controls the maximum number of expressions that will be used in a SAI intersection operation. */
SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection.clause.limit", "2"),

/**
* Used to determine the block size and block mask for the clustering sorted terms.
*/
SAI_SORTED_TERMS_CLUSTERING_BLOCK_SHIFT("cassandra.sai.sorted_terms_clustering_block_shift", "4"),

/**
* Used to determine the block size and block mask for the partition sorted terms.
*/
SAI_SORTED_TERMS_PARTITION_BLOCK_SHIFT("cassandra.sai.sorted_terms_partition_block_shift", "4"),

/** Whether vector type only allows float vectors. True by default. **/
VECTOR_FLOAT_ONLY("cassandra.float_only_vectors", "true"),
/** Enables use of vector type. True by default. **/
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/db/marshal/AbstractType.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.AssignmentTestable;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.ColumnSpecification;
Expand Down Expand Up @@ -178,6 +177,11 @@ public <V> T compose(V value, ValueAccessor<V> accessor)
return getSerializer().deserialize(value, accessor);
}

public ByteBuffer decomposeUntyped(Object value)
{
return decompose((T) value);
}

public ByteBuffer decompose(T value)
{
return getSerializer().serialize(value);
Expand Down
6 changes: 2 additions & 4 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.LoggerFactory;

import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.ClusteringComparator;
Expand Down Expand Up @@ -73,7 +72,6 @@
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
import org.apache.cassandra.index.sai.memory.MemtableIndex;
import org.apache.cassandra.index.sai.memory.MemtableKeyRangeIterator;
import org.apache.cassandra.index.sai.memory.TrieMemtableIndex;
import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics;
import org.apache.cassandra.index.sai.metrics.IndexMetrics;
import org.apache.cassandra.index.sai.plan.Expression;
Expand Down Expand Up @@ -610,7 +608,7 @@ public int getIntOption(String name, int defaultValue)
}
catch (NumberFormatException e)
{
logger.error("Failed to parse index configuration " + name + " = " + value + " as integer");
logger.error("Failed to parse index configuration {} = {} as integer", name, value);
return defaultValue;
}
}
Expand Down Expand Up @@ -998,7 +996,7 @@ public long indexFileCacheSize()
public IndexFeatureSet indexFeatureSet()
{
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator();
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(set -> accumulator.accumulate(set));
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(accumulator::accumulate);
return accumulator.complete();
}
}
3 changes: 1 addition & 2 deletions src/java/org/apache/cassandra/index/sai/SSTableContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private SSTableContext(SSTableContext copy)
this.primaryKeyMapFactory = copy.primaryKeyMapFactory;
}

@SuppressWarnings("resource")
public static SSTableContext create(SSTableReader sstable, IndexComponents.ForRead perSSTableComponents)
{
var onDiskFormat = perSSTableComponents.onDiskFormat();
Expand Down Expand Up @@ -146,7 +145,7 @@ public PrimaryKeyMap.Factory primaryKeyMapFactory()
*/
public int openFilesPerSSTable()
{
return perSSTableComponents.onDiskFormat().openFilesPerSSTable();
return perSSTableComponents.onDiskFormat().openFilesPerSSTable(sstable.hasClustering);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.cassandra.index.sai;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableWatcher;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
Expand Down Expand Up @@ -245,7 +243,7 @@ public void rangeTombstone(RangeTombstone tombstone)

private void forEach(Consumer<Index.Indexer> action)
{
indexers.forEach(action::accept);
indexers.forEach(action);
}
};
}
Expand All @@ -259,7 +257,7 @@ public StorageAttachedIndexQueryPlan queryPlanFor(RowFilter rowFilter)
@Override
public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata, long keyCount)
{
IndexDescriptor indexDescriptor = IndexDescriptor.empty(descriptor);
IndexDescriptor indexDescriptor = IndexDescriptor.empty(descriptor, tableMetadata);
try
{
return new StorageAttachedIndexWriter(indexDescriptor, tableMetadata, indices, tracker, keyCount, baseCfs.metric);
Expand All @@ -284,7 +282,7 @@ public boolean handles(IndexTransaction.Type type)
@Override
public Set<Component> componentsForNewSSTable()
{
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices);
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices, baseCfs.metadata().comparator.size() > 0);
}

@Override
Expand Down Expand Up @@ -335,7 +333,7 @@ public void handleNotification(INotification notification, Object sender)

// Avoid validation for index files just written following Memtable flush. ZCS streaming should
// validate index checksum.
boolean validate = notice.fromStreaming() || !notice.memtable().isPresent();
boolean validate = notice.fromStreaming() || notice.memtable().isEmpty();
onSSTableChanged(Collections.emptySet(), notice.added, indices, validate);
}
else if (notification instanceof SSTableListChangedNotification)
Expand Down Expand Up @@ -435,7 +433,7 @@ public int totalIndexBuildsInProgress()
*/
public int totalQueryableIndexCount()
{
return (int) indices.stream().filter(i -> baseCfs.indexManager.isIndexQueryable(i)).count();
return (int) indices.stream().filter(baseCfs.indexManager::isIndexQueryable).count();
}

/**
Expand Down Expand Up @@ -512,7 +510,7 @@ public void unsafeReload()
public void reset()
{
contextManager.clear();
indices.forEach(index -> index.makeIndexNonQueryable());
indices.forEach(StorageAttachedIndex::makeIndexNonQueryable);
onSSTableChanged(baseCfs.getLiveSSTables(), Collections.emptySet(), indices, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@
import com.google.common.base.Stopwatch;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.index.sai.utils.PrimaryKey;

/**
* Writes all SSTable-attached index token and offset structures.
*/
public interface PerSSTableWriter
{
public static final PerSSTableWriter NONE = (key) -> {};
PerSSTableWriter NONE = key -> {};

default void startPartition(long position) throws IOException
{}

default void startPartition(DecoratedKey decoratedKey) throws IOException
{}

void nextRow(PrimaryKey primaryKey) throws IOException;

default void complete(Stopwatch stopwatch) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public void close() throws IOException

FileUtils.closeQuietly(postingList, primaryKeyMap);
}
else {
else
{
logger.warn("PostingListKeyRangeIterator is already closed",
new IllegalStateException("PostingListKeyRangeIterator is already closed"));
}

}

private boolean exhausted()
Expand All @@ -160,7 +160,7 @@ private long getNextRowId() throws IOException
long segmentRowId;
if (needsSkipping)
{
long targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
long targetSstableRowId = primaryKeyMap.rowIdFromPrimaryKey(skipToToken);
// skipToToken is larger than max token in token file
if (targetSstableRowId < 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface PrimaryKeyMap extends Closeable
* A factory for creating {@link PrimaryKeyMap} instances. Implementations of this
* interface are expected to be threadsafe.
*/
public interface Factory extends Closeable
interface Factory extends Closeable
{
/**
* Creates a new {@link PrimaryKeyMap} instance
Expand Down Expand Up @@ -110,7 +110,7 @@ default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lo
* @param key the {@link PrimaryKey} to lookup
* @return the row Id associated with the {@link PrimaryKey}
*/
long exactRowIdOrInvertedCeiling(PrimaryKey key);
long rowIdFromPrimaryKey(PrimaryKey key);

/**
* Returns the sstable row id associated with the least {@link PrimaryKey} greater than or equal to the given
Expand All @@ -121,7 +121,7 @@ default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lo
* @param key the {@link PrimaryKey} to lookup
* @return an sstable row id or a negative value if no row is found
*/
long ceiling(PrimaryKey key);
// long ceiling(PrimaryKey key);

/**
* Returns the sstable row id associated with the greatest {@link PrimaryKey} less than or equal to the given
Expand All @@ -132,7 +132,7 @@ default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lo
* @param key the {@link PrimaryKey} to lookup
* @return an sstable row id or a negative value if no row is found
*/
long floor(PrimaryKey key);
// long floor(PrimaryKey key);

/**
* Returns the number of primary keys in the map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public static KeyRangeIterator create(SSTableContext ctx, AbstractBounds<Partiti
PrimaryKey minKey = (minKeyBound.compareTo(sstableMinKey) > 0)
? minKeyBound
: sstableMinKey;
long startRowId = minToken.isMinimum() ? 0 : keys.ceiling(minKey);
long startRowId = minToken.isMinimum() ? 0 : keys.rowIdFromPrimaryKey(minKey);
return new PrimaryKeyMapIterator(keys, sstableMinKey, sstableMaxKey, startRowId, filter);
}

@Override
protected void performSkipTo(PrimaryKey nextKey)
{
this.currentRowId = keys.ceiling(nextKey);
this.currentRowId = keys.rowIdFromPrimaryKey(nextKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void startPartition(DecoratedKey key, long position)
try
{
perSSTableWriter.startPartition(position);
perSSTableWriter.startPartition(key);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.store.ChecksumIndexInput;

public interface IndexComponent
Expand All @@ -50,10 +51,10 @@ interface ForRead extends IndexComponent
@Override
IndexComponents.ForRead parent();

FileHandle createFileHandle();
FileHandle createFileHandle(Throwables.DiscreteAction<?> cleanup);

/**
* Opens a file handle for the provided index component similarly to {@link #createFileHandle()},
* Opens a file handle for the provided index component similarly to {@link #createFileHandle(Throwables.DiscreteAction)},
* but this method shoud be called instead of the aforemented one if the access is done during index building, that is
* before the full index that this is a part of has been finalized.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,37 @@ public enum IndexComponentType
*
* V2
*/
PRIMARY_KEY_TRIE("PrimaryKeyTrie"),
// PRIMARY_KEY_TRIE("PrimaryKeyTrie"),

/**
* Prefix-compressed blocks of primary keys used for rowId to partition key lookups
*
* An on-disk block packed index containing the starting and ending rowIds for each partition.
*/
PARTITION_SIZES("PartitionSizes"),

/**
* Prefix-compressed blocks of partition keys used for rowId to partition key lookups
* <p>
* V2
*/
PRIMARY_KEY_BLOCKS("PrimaryKeyBlocks"),
PARTITION_KEY_BLOCKS("PartitionKeyBlocks"),

/**
* Encoded sequence of offsets to primary key blocks
*
* Encoded sequence of offsets to partition key blocks
* <p>
* V2
*/
PRIMARY_KEY_BLOCK_OFFSETS("PrimaryKeyBlockOffsets"),
PARTITION_KEY_BLOCK_OFFSETS("PartitionKeyBlockOffsets"),

/**
* Prefix-compressed blocks of clustering keys used for rowId to clustering key lookups
*/
CLUSTERING_KEY_BLOCKS("ClusteringKeyBlocks"),

/**
* Encoded sequence of offsets to clustering key blocks
*/
CLUSTERING_KEY_BLOCK_OFFSETS("ClusteringKeyBlockOffsets"),

/**
* Stores per-sstable metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.io.sstable.Component;
Expand Down Expand Up @@ -182,9 +183,13 @@ default Set<IndexComponentType> expectedComponentsForVersion()
{
return isPerIndexGroup()
? onDiskFormat().perIndexComponentTypes(context())
: onDiskFormat().perSSTableComponentTypes();
: onDiskFormat().perSSTableComponentTypes(hasClustering());
}

boolean hasClustering();

ClusteringComparator comparator();

default ByteComparable.Version byteComparableVersionFor(IndexComponentType component)
{
return version().byteComparableVersionFor(component, descriptor().version);
Expand Down
Loading