Skip to content

CNDB-11613 SAI compressed indexes #1474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/java/org/apache/cassandra/cql3/CqlBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -238,4 +239,36 @@ public String toString()
{
return builder.toString();
}

/**
* Builds a `WITH option1 = ... AND option2 = ... AND option3 = ... clause
* @param builder a receiver to receive a builder allowing to add each option
*/
public CqlBuilder appendOptions(Consumer<OptionsBuilder> builder)
{
builder.accept(new OptionsBuilder(this));
return this;
}

public static class OptionsBuilder
{
private final CqlBuilder builder;
private boolean empty = true;

OptionsBuilder(CqlBuilder builder)
{
this.builder = builder;
}

public OptionsBuilder append(String name, Map<String, String> options)
{
if (options.isEmpty())
return this;

builder.append((empty ? " WITH " : " AND ") + name + " = ");
empty = false;
builder.append(options);
return this;
}
}
}
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.CloseableIterator;
Expand Down Expand Up @@ -124,6 +125,8 @@ public class IndexContext
ImmutableSet.of(UTF8Type.instance, AsciiType.instance, BooleanType.instance, UUIDType.instance);

public static final String ENABLE_SEGMENT_COMPACTION_OPTION_NAME = "enable_segment_compaction";
public static final String KEY_COMPRESSION_OPTION_NAME = "key_compression";
public static final String VALUE_COMPRESSION_OPTION_NAME = "value_compression";

private final AbstractType<?> partitionKeyType;
private final ClusteringComparator clusteringComparator;
Expand All @@ -146,6 +149,7 @@ public class IndexContext
private final IndexMetrics indexMetrics;
private final ColumnQueryMetrics columnQueryMetrics;
private final IndexWriterConfig indexWriterConfig;
private final CompressionParams valueCompression;
private final boolean isAnalyzed;
private final boolean hasEuclideanSimilarityFunc;
private final AbstractAnalyzer.AnalyzerFactory analyzerFactory;
Expand Down Expand Up @@ -198,6 +202,7 @@ public IndexContext(@Nonnull String keyspace,
isLiteral() ? new ColumnQueryMetrics.TrieIndexMetrics(keyspace, table, getIndexName())
: new ColumnQueryMetrics.BKDIndexMetrics(keyspace, table, getIndexName());

this.valueCompression = StorageAttachedIndex.getCompressionOptionUnchecked(config.options, VALUE_COMPRESSION_OPTION_NAME);
}
else
{
Expand All @@ -213,6 +218,7 @@ public IndexContext(@Nonnull String keyspace,
// query path.
this.indexMetrics = null;
this.columnQueryMetrics = null;
this.valueCompression = CompressionParams.noCompression();
}

this.maxTermSize = isVector() ? MAX_VECTOR_TERM_SIZE
Expand All @@ -223,6 +229,11 @@ public IndexContext(@Nonnull String keyspace,
logger.debug(logMessage("Initialized index context with index writer config: {}"), indexWriterConfig);
}

public IndexMetadata getConfig()
{
return config;
}

public AbstractType<?> keyValidator()
{
return partitionKeyType;
Expand Down Expand Up @@ -621,6 +632,11 @@ public String getIndexName()
return this.config == null ? null : config.name;
}

public CompressionParams getValueCompression()
{
return valueCompression;
}

public int getIntOption(String name, int defaultValue)
{
String value = this.config.options.get(name);
Expand Down
95 changes: 91 additions & 4 deletions src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.index.sai;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -36,7 +37,10 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -99,6 +103,7 @@
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientWarn;
Expand Down Expand Up @@ -200,6 +205,8 @@ public List<SecondaryIndexBuilder> getParallelIndexBuildTasks(ColumnFamilyStore
NonTokenizingOptions.ASCII,
// For now, we leave this for backward compatibility even though it's not used
IndexContext.ENABLE_SEGMENT_COMPACTION_OPTION_NAME,
IndexContext.KEY_COMPRESSION_OPTION_NAME,
IndexContext.VALUE_COMPRESSION_OPTION_NAME,
IndexTarget.TARGET_OPTION_NAME,
IndexTarget.CUSTOM_INDEX_OPTION_NAME,
IndexWriterConfig.POSTING_LIST_LVL_MIN_LEAVES,
Expand Down Expand Up @@ -228,7 +235,6 @@ public List<SecondaryIndexBuilder> getParallelIndexBuildTasks(ColumnFamilyStore
ImmutableSet.of(OrderPreservingPartitioner.class, LocalPartitioner.class, ByteOrderedPartitioner.class, RandomPartitioner.class);

private final ColumnFamilyStore baseCfs;
private final IndexMetadata config;
private final IndexContext indexContext;

// Tracks whether or not we've started the index build on initialization.
Expand All @@ -243,7 +249,6 @@ public List<SecondaryIndexBuilder> getParallelIndexBuildTasks(ColumnFamilyStore
public StorageAttachedIndex(ColumnFamilyStore baseCfs, IndexMetadata config)
{
this.baseCfs = baseCfs;
this.config = config;
TableMetadata tableMetadata = baseCfs.metadata();
Pair<ColumnMetadata, IndexTarget.Type> target = TargetParser.parse(tableMetadata, config);
this.indexContext = new IndexContext(tableMetadata.keyspace,
Expand Down Expand Up @@ -320,6 +325,27 @@ public static Map<String, String> validateOptions(Map<String, String> options, T
if (duplicateCount > 1)
throw new InvalidRequestException(String.format("Cannot create duplicate storage-attached index on column: %s", target.left));

// Get to validate only; will throw IRE if option is invalid
CompressionParams valueCompression = getCompressionOptionChecked(options, IndexContext.VALUE_COMPRESSION_OPTION_NAME);
Comment on lines +328 to +329

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We could put this below, after getting the key compression, and right before checking the format version. That is, immediately before its first usage.


// Check for existence of other indexes with different key_compression:
CompressionParams keyCompression = getCompressionOptionChecked(options, IndexContext.KEY_COMPRESSION_OPTION_NAME);
for (IndexMetadata other : metadata.indexes)
{
CompressionParams otherCompression = getCompressionOptionUnchecked(other.options, IndexContext.KEY_COMPRESSION_OPTION_NAME);
if (other.getIndexClassName().equals(StorageAttachedIndex.class.getName())
&& !otherCompression.equals(keyCompression))
{
throw new InvalidRequestException(String.format("Cannot create storage-attached index on column %s with different key_compression than existing index %s. Expected: %s",
target.left, other.name, keyCompression.asMap()));
}
}

if ((valueCompression.isEnabled() || keyCompression.isEnabled()) && !Version.current().onOrAfter(Version.EC))
throw new InvalidRequestException("Cannot create compressed storage-attached index. " +
"Enabling index compression requires at least SAI version 'ec'. " +
"Your current index version is set to '" + Version.current() + '\'');

// Analyzer is not supported against PK columns
if (isAnalyzed)
{
Expand Down Expand Up @@ -380,8 +406,68 @@ else if (!SUPPORTED_TYPES.contains(type.asCQL3Type()) && !TypeUtil.isFrozen(type
throw new InvalidRequestException("Unsupported type for SAI: " + type.asCQL3Type());
}

return Collections.emptyMap();
return unknown;
}

/**
* Gets compression params from index metadata option. Does not throw exception if the option is invalid,
* but returns no compression instead.
*
* @param options index options
* @param optionName index option key containing compression params map in JSON format
*/
public static CompressionParams getCompressionOptionUnchecked(Map<String, String> options, String optionName)
{
return getCompressionOption(options, optionName, (compression,e) ->
{
logger.error("Invalid value of {}: {} ({}). Falling back to no compression.",
optionName, compression, e.getMessage());
return CompressionParams.noCompression();
});
}
/**
* Gets compression params from index metadata option.
* Returns no compression if the compression option is missing.
* Throws {@link InvalidRequestException} if the compression option is invalid.
*
* @param options index options
* @param optionName index option key containing compression params map in JSON format
*/
public static CompressionParams getCompressionOptionChecked(Map<String, String> options, String optionName)
{
return getCompressionOption(options, optionName, (compression,e) ->
{
throw new InvalidRequestException(String.format("Invalid value of %s: %s (%s)",
optionName, compression, e.getMessage()));
});
}

/**
* Gets compression params from index metadata option.
* Returns no compression if the compression option is missing.
* Delegates to onParseError if the compression option is invalid.
*
* @param options index options
* @param optionName index option key containing compression params map in JSON format
*/
private static CompressionParams getCompressionOption(Map<String, String> options,
String optionName,
BiFunction<String, IOException, CompressionParams> onParseError)
{
String compression = options.get(optionName);
if (compression == null)
return CompressionParams.noCompression();

try
{
return CompressionParams.fromJson(compression);
}
catch (IOException e)
{
return onParseError.apply(compression, e);
}
}


@Override
public void register(IndexRegistry registry)
Expand All @@ -399,7 +485,7 @@ public void unregister(IndexRegistry registry)
@Override
public IndexMetadata getIndexMetadata()
{
return config;
return indexContext.getConfig();
}

@Override
Expand Down Expand Up @@ -904,6 +990,7 @@ public IndexContext getIndexContext()
@Override
public String toString()
{
IndexMetadata config = indexContext.getConfig();
return String.format("%s.%s.%s", baseCfs.keyspace.getName(), baseCfs.name, config == null ? "?" : config.name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.tries.TrieSpaceExhaustedException;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
Expand All @@ -44,6 +45,7 @@
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ApproximateTime;
import org.apache.cassandra.utils.Throwables;
Expand Down Expand Up @@ -107,11 +109,14 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
.filter(Objects::nonNull) // a null here means the column had no data to flush
.collect(Collectors.toList());

CompressionParams keyCompression = StorageAttachedIndex.getCompressionOptionUnchecked(indices.iterator().next().getIndexMetadata().options,
IndexContext.KEY_COMPRESSION_OPTION_NAME);

// If the SSTable components are already being built by another index build then we don't want
// to build them again so use a NO-OP writer
this.perSSTableWriter = perIndexComponentsOnly
? PerSSTableWriter.NONE
: onDiskFormat.newPerSSTableWriter(indexDescriptor);
: onDiskFormat.newPerSSTableWriter(indexDescriptor, keyCompression);
this.tableMetrics = tableMetrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.lucene.store.ChecksumIndexInput;

public interface IndexComponent
Expand All @@ -38,8 +39,15 @@ public interface IndexComponent

String fileNamePart();
Component asCustomComponent();

File file();

/**
* Returns the compression metadata component associated with this component.
* Compression information is needed to decompress this component.
*/
IndexComponent compressionMetadataComponent();

default boolean isCompletionMarker()
{
return componentType() == parent().completionMarkerComponent();
Expand Down Expand Up @@ -77,7 +85,18 @@ default IndexOutputWriter openOutput() throws IOException
return openOutput(false);
}

IndexOutputWriter openOutput(boolean append) throws IOException;
default IndexOutputWriter openOutput(boolean append) throws IOException
{
return openOutput(append, CompressionParams.noCompression());
}

default IndexOutputWriter openOutput(CompressionParams compression) throws IOException
{
return openOutput(false, compression);
}

IndexOutputWriter openOutput(boolean append, CompressionParams compression) throws IOException;


void createEmpty() throws IOException;
}
Expand Down
Loading