Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,10 @@ public enum CassandraRelevantProperties
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),

SAI_VECTOR_USE_PRUNING_DEFAULT("cassandra.sai.jvector.use_pruning_default", "1000"),

/** The class to use for selecting the current version of the SAI on-disk index format on a per-keyspace basis. */
SAI_VERSION_SELECTOR_CLASS("cassandra.sai.version.selector.class", ""),

SAI_WRITE_JVECTOR3_FORMAT("cassandra.sai.write_jv3_format", "false"),

SCHEMA_PULL_INTERVAL_MS("cassandra.schema_pull_interval_ms", "60000"),
Expand Down
19 changes: 6 additions & 13 deletions src/java/org/apache/cassandra/cql3/Ordering.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,21 @@ public ColumnMetadata getColumn()
*/
public static class Ann implements Expression
{
final ColumnMetadata column;
final Term vectorValue;
final Direction direction;
// TODO: remove this when we no longer need to downgrade to replicas on version lower than ED SAI that don't
// know about synthetic columns, and the related code in
// - StatementRestrictions.addOrderingRestrictions
// - StorageAttachedIndexSearcher.PrimaryKeyIterator constructor
// This is volatile rather than final so that tests may use reflection to change it.
@SuppressWarnings("FieldMayBeFinal")
private static volatile boolean USE_SYNTHETIC_SCORE = useSyntheticScore(Version.CURRENT);

final ColumnMetadata column;
final Term vectorValue;
final Direction direction;
final boolean useSyntheticScore;

public Ann(ColumnMetadata column, Term vectorValue, Direction direction)
{
this.column = column;
this.vectorValue = vectorValue;
this.direction = direction;
useSyntheticScore = useSyntheticScore(Version.current(column.ksName));
}

public static boolean useSyntheticScore(Version version)
Expand All @@ -122,11 +120,6 @@ public static boolean useSyntheticScore(Version version)
return CassandraRelevantProperties.SAI_ANN_USE_SYNTHETIC_SCORE.getBoolean(defaultValue);
}

public static boolean useSyntheticScore()
{
return USE_SYNTHETIC_SCORE;
}

@Override
public boolean hasNonClusteredOrdering()
{
Expand All @@ -148,7 +141,7 @@ public ColumnMetadata getColumn()
@Override
public boolean isScored()
{
return useSyntheticScore();
return useSyntheticScore;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,10 @@ private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind kind, I

private String generateIndexName(KeyspaceMetadata keyspace, List<IndexTarget> targets)
{
assert keyspace.name.equals(keyspaceName);
String baseName = targets.size() == 1
? IndexMetadata.generateDefaultIndexName(tableName, targets.get(0).column)
: IndexMetadata.generateDefaultIndexName(tableName, null);
? IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, targets.get(0).column)
: IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, null);
return keyspace.findAvailableIndexName(baseName);
}

Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,9 @@ default void unload() { }
* Returns the set of sstable-attached components that this group will create for a newly flushed sstable.
* </p>
* Note that the result of this method is only valid for newly flushed/written sstables as the components
* returned will assume a version of {@link Version#current()} and a generation of 0. SSTables for which some
* index have been rebuild may have index components that do not match what this method return in particular.
* returned will assume a version of {@link Version#current(String)} and a generation of 0. SSTables for which
* some index have been rebuild may have index components that do not match what this method return in
* particular.
*/
Set<Component> componentsForNewSSTable();

Expand Down
15 changes: 11 additions & 4 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class IndexContext
private final AbstractAnalyzer.AnalyzerFactory queryAnalyzerFactory;
private final PrimaryKey.Factory primaryKeyFactory;

private final Version version;
private final int maxTermSize;

private volatile boolean dropped = false;
Expand All @@ -177,7 +178,8 @@ public IndexContext(@Nonnull String keyspace,
this.viewManager = new IndexViewManager(this);
this.validator = TypeUtil.cellValueType(column, indexType);
this.cfs = cfs;
this.primaryKeyFactory = Version.current().onDiskFormat().newPrimaryKeyFactory(clusteringComparator);
this.version = Version.current(keyspace);
this.primaryKeyFactory = version.onDiskFormat().newPrimaryKeyFactory(clusteringComparator);

String columnName = column.name.toString();

Expand Down Expand Up @@ -223,6 +225,11 @@ public IndexContext(@Nonnull String keyspace,
logger.debug(logMessage("Initialized index context with index writer config: {}"), indexWriterConfig);
}

public Version version()
{
return version;
}

public AbstractType<?> keyValidator()
{
return partitionKeyType;
Expand Down Expand Up @@ -408,7 +415,7 @@ private boolean validateCumulativeAnalyzedTermLimit(DecoratedKey key, AbstractAn

public void update(DecoratedKey key, Row oldRow, Row newRow, Memtable memtable, OpOrder.Group opGroup)
{
if (Version.current().equals(Version.AA))
if (version.equals(Version.AA))
{
// AA cannot handle updates because it indexes partition keys instead of fully qualified primary keys.
index(key, newRow, memtable, opGroup);
Expand Down Expand Up @@ -656,7 +663,7 @@ public View getReferencedView(long timeoutNanos)
*/
public int openPerIndexFiles()
{
return viewManager.getView().size() * Version.current().onDiskFormat().openFilesPerIndex(this);
return viewManager.getView().size() * version.onDiskFormat().openFilesPerIndex(this);
}

public void prepareSSTablesForRebuild(Collection<SSTableReader> sstablesToRebuild)
Expand Down Expand Up @@ -1006,7 +1013,7 @@ public long indexFileCacheSize()

public IndexFeatureSet indexFeatureSet()
{
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator();
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator(version);
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(set -> accumulator.accumulate(set));
return accumulator.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,12 @@ private Future<?> startInitialBuild(ColumnFamilyStore baseCfs, boolean validate,
return ImmediateFuture.success(null);
}

if (indexContext.isVector() && Version.current().compareTo(Version.JVECTOR_EARLIEST) < 0)
if (indexContext.isVector() && indexContext.version().compareTo(Version.JVECTOR_EARLIEST) < 0)
{
throw new FeatureNeedsIndexRebuildException(String.format("The current configured on-disk format version %s does not support vector indexes. " +
"The minimum version that supports vectors is %s. " +
"The on-disk format version can be set via the -D%s system property.",
Version.current(),
indexContext.version(),
Version.JVECTOR_EARLIEST,
CassandraRelevantProperties.SAI_CURRENT_VERSION.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private static void prepareForRebuild(IndexComponents.ForRead components, Set<Co
// where immutable components was enabled, but then disabled for some reason. If that happens, we still
// want to ensure a new build removes the old files both from disk (happens below) and from the sstable TOC
// (which is what `replacedComponents` is about)).
if (components.version().useImmutableComponentFiles() || !components.buildId().equals(ComponentsBuildId.forNewSSTable()))
if (components.version().useImmutableComponentFiles() || !components.buildId().equals(ComponentsBuildId.forNewSSTable(components.version())))
replacedComponents.addAll(components.allAsCustomComponents());

if (!components.version().useImmutableComponentFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.metrics.IndexGroupMetrics;
import org.apache.cassandra.index.sai.metrics.TableQueryMetrics;
import org.apache.cassandra.index.sai.metrics.TableStateMetrics;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
private final ColumnFamilyStore baseCfs;

private final SSTableContextManager contextManager;
private final Version version;



Expand All @@ -104,6 +106,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
this.stateMetrics = new TableStateMetrics(baseCfs.metadata(), this);
this.groupMetrics = new IndexGroupMetrics(baseCfs.metadata(), this);
this.contextManager = new SSTableContextManager(baseCfs.getTracker());
this.version = Version.current(baseCfs.keyspace.getName());

Tracker tracker = baseCfs.getTracker();
tracker.subscribe(this);
Expand Down Expand Up @@ -285,7 +288,7 @@ public boolean handles(IndexTransaction.Type type)
@Override
public Set<Component> componentsForNewSSTable()
{
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices);
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices, version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.cassandra.index.sai.disk.oldlucene.ResettableByteBuffersIndexOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
Expand All @@ -43,9 +44,9 @@ public class ModernResettableByteBuffersIndexOutput extends ResettableByteBuffer
private final ByteBuffersIndexOutput bbio;
private final ByteBuffersDataOutput delegate;

public ModernResettableByteBuffersIndexOutput(int expectedSize, String name)
public ModernResettableByteBuffersIndexOutput(int expectedSize, String name, Version version)
{
super("", name, ByteOrder.LITTLE_ENDIAN);
super("", name, ByteOrder.LITTLE_ENDIAN, version);
delegate = new ByteBuffersDataOutput(expectedSize);
bbio = new ByteBuffersIndexOutput(delegate, "", name + "-bb");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
TableMetrics tableMetrics) throws IOException
{
// We always write at the latest version (through what that version is can be configured for specific cases)
var onDiskFormat = Version.current().onDiskFormat();
var onDiskFormat = Version.current(indexDescriptor.descriptor.ksname).onDiskFormat();
this.indexDescriptor = indexDescriptor;
// Note: I think there is a silent assumption here. That is, the PK factory we use here must be for the current
// format version, because that is what `IndexContext.keyFactory` always uses (see ctor)
Expand Down Expand Up @@ -125,7 +125,7 @@ public void begin()
public void startPartition(DecoratedKey key, long position, long keyPositionForSASI)
{
if (aborted) return;

currentKey = key;

try
Expand Down Expand Up @@ -167,7 +167,7 @@ public void nextUnfilteredCluster(Unfiltered unfiltered)
public void staticRow(Row staticRow)
{
if (aborted) return;

if (staticRow.isEmpty())
return;

Expand Down Expand Up @@ -295,7 +295,7 @@ public void complete(SSTable sstable)

/**
* Aborts all column index writers and, only if they have not yet completed, SSTable-level component writers.
*
*
* @param accumulator the initial exception thrown from the failed writer
*/
@Override
Expand All @@ -316,7 +316,7 @@ public void abort(Throwable accumulator, boolean fromIndex)

// Mark the write aborted, so we can short-circuit any further operations on the component writers.
aborted = true;

// For non-compaction and non-flush, make any indexes involved in this transaction non-queryable,
// as they will likely not match the backing table.
// For compaction and flush: the task should be aborted and new sstables will not be added to tracker.
Expand All @@ -326,7 +326,7 @@ public void abort(Throwable accumulator, boolean fromIndex)
// on this node only and let the rest of the cluster operate normally.
if (fromIndex && opType != OperationType.COMPACTION && opType != OperationType.FLUSH)
indices.forEach(StorageAttachedIndex::makeIndexNonQueryable);

for (PerIndexWriter perIndexWriter : perIndexWriters)
{
try
Expand All @@ -341,10 +341,10 @@ public void abort(Throwable accumulator, boolean fromIndex)
}
}
}

if (!tokenOffsetWriterCompleted)
{
// If the token/offset files have already been written successfully, they can be reused later.
// If the token/offset files have already been written successfully, they can be reused later.
perSSTableWriter.abort(accumulator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
*/
public class ComponentsBuildId implements Comparable<ComponentsBuildId>
{
private static final ComponentsBuildId FOR_NEW_SSTABLE = ComponentsBuildId.current(0);

private final Version version;
private final int generation;

Expand All @@ -47,19 +45,13 @@ public static ComponentsBuildId of(Version version, int generation)
return new ComponentsBuildId(version, generation);
}

public static ComponentsBuildId current(int generation)
{
return of(Version.current(), generation);
}

public static ComponentsBuildId forNewSSTable()
public static ComponentsBuildId forNewSSTable(Version version)
{
return FOR_NEW_SSTABLE;
return ComponentsBuildId.of(version, 0);
}

public static ComponentsBuildId forNewBuild(@Nullable ComponentsBuildId previousBuild, Predicate<ComponentsBuildId> newBuildIsUsablePredicate)
public static ComponentsBuildId forNewBuild(Version version, @Nullable ComponentsBuildId previousBuild, Predicate<ComponentsBuildId> newBuildIsUsablePredicate)
{
Version version = Version.current();
// If we're not using immutable components, we always use generation 0, and we're fine if that overrides existing files
if (!version.useImmutableComponentFiles())
return new ComponentsBuildId(version, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
* of the completion marker, but not of the other component of the group). The bumping of generations takes incomplete
* groups into account, and so incomplete groups are not overridden either. Essentially, the generation used by a new
* build is always one more than the highest generation of any component found on disk (for the group in question, and
* the version we writting, usually {@link Version#current()}).
* the version we writting, usually {@link Version#current(String)}).
*/
public interface IndexComponents
{
Expand Down
Loading