Skip to content

Commit

Permalink
Move per-sstable compression control to CREATE INDEX
Browse files Browse the repository at this point in the history
Index compression options have been split into
key_compression and value_compression so you can write:

CREATE INDEX ON tab(v)
  WITH key_compression = { 'class': 'LZ4Compressor' }
  AND value_compression = { 'class': 'LZ4Compressor' }
  • Loading branch information
pkolaczk committed Jan 24, 2025
1 parent 0f62916 commit 6969bb1
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 105 deletions.
33 changes: 33 additions & 0 deletions src/java/org/apache/cassandra/cql3/CqlBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -238,4 +239,36 @@ public String toString()
{
return builder.toString();
}

/**
* Builds a `WITH option1 = ... AND option2 = ... AND option3 = ... clause
* @param builder a receiver to receive a builder allowing to add each option
*/
public CqlBuilder appendOptions(Consumer<OptionsBuilder> builder)
{
builder.accept(new OptionsBuilder(this));
return this;
}

public static class OptionsBuilder
{
private CqlBuilder builder;
boolean empty = true;

OptionsBuilder(CqlBuilder builder)
{
this.builder = builder;
}

public OptionsBuilder append(String name, Map<String, String> options)
{
if (options.isEmpty())
return this;

builder.append((empty ? "WITH " : "AND ") + name + " = ");
empty = false;
builder.append(options);
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,17 @@ public Keyspaces apply(Keyspaces schema)

Map<String, String> options = attrs.isCustom ? attrs.getOptions() : Collections.emptyMap();

Map<String, String> compressionOptions = attrs.getMap("compression");
CompressionParams compression = compressionOptions != null
? CompressionParams.fromMap(compressionOptions)
Map<String, String> keyCompressionOptions = attrs.getMap("key_compression");
CompressionParams keyCompression = keyCompressionOptions != null
? CompressionParams.fromMap(keyCompressionOptions)
: CompressionParams.noCompression();

IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, name, kind, options, compression);
Map<String, String> valueCompressionOptions = attrs.getMap("value_compression");
CompressionParams valueCompression = valueCompressionOptions != null
? CompressionParams.fromMap(valueCompressionOptions)
: CompressionParams.noCompression();

IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, name, kind, options, keyCompression, valueCompression);

String className = index.getIndexClassName();
IndexGuardrails guardRails = IndexGuardrails.forClassName(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class IndexAttributes extends PropertyDefinitions
private static final String DEFAULT_INDEX_CLASS_PROPERTY = "cassandra.default_index_implementation_class";

private static final String KW_OPTIONS = "options";
private static final String KW_COMPRESSION = "compression";
private static final String KW_KEY_COMPRESSION = "key_compression";
private static final String KW_VALUE_COMPRESSION = "value_compression";

private static final Set<String> keywords = new HashSet<>();
private static final Set<String> obsoleteKeywords = new HashSet<>();
Expand All @@ -40,7 +41,8 @@ public class IndexAttributes extends PropertyDefinitions
static
{
keywords.add(KW_OPTIONS);
keywords.add(KW_COMPRESSION);
keywords.add(KW_KEY_COMPRESSION);
keywords.add(KW_VALUE_COMPRESSION);
}

public void maybeApplyDefaultIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ private TableParams build(TableParams.Builder builder)
builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
}

if (hasOption(Option.INDEX_COMPRESSION))
builder.indexCompression(CompressionParams.fromMap(getMap(Option.INDEX_COMPRESSION)));

if (hasOption(Option.MEMTABLE))
builder.memtable(MemtableParams.fromMap(getMap(Option.MEMTABLE)));

Expand Down
9 changes: 7 additions & 2 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,14 @@ public String getIndexName()
return this.config == null ? null : config.name;
}

public CompressionParams getCompression()
public CompressionParams getKeyCompression()
{
return this.config.compression;
return this.config.keyCompression;
}

public CompressionParams getValueCompression()
{
return this.config.valueCompression;
}

public int getIntOption(String name, int defaultValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ApproximateTime;
import org.apache.cassandra.utils.Throwables;
Expand Down Expand Up @@ -107,11 +108,13 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
.filter(Objects::nonNull) // a null here means the column had no data to flush
.collect(Collectors.toList());

CompressionParams keyCompression = indices.iterator().next().getIndexMetadata().keyCompression;

// If the SSTable components are already being built by another index build then we don't want
// to build them again so use a NO-OP writer
this.perSSTableWriter = perIndexComponentsOnly
? PerSSTableWriter.NONE
: onDiskFormat.newPerSSTableWriter(indexDescriptor);
: onDiskFormat.newPerSSTableWriter(indexDescriptor, keyCompression);
this.tableMetrics = tableMetrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.lucene.store.ChecksumIndexInput;

public interface IndexComponent
Expand Down Expand Up @@ -81,7 +82,18 @@ default IndexOutputWriter openOutput() throws IOException
return openOutput(false);
}

IndexOutputWriter openOutput(boolean append) throws IOException;
default IndexOutputWriter openOutput(boolean append) throws IOException
{
return openOutput(append, CompressionParams.noCompression());
}

default IndexOutputWriter openOutput(CompressionParams compression) throws IOException
{
return openOutput(false, compression);
}

IndexOutputWriter openOutput(boolean append, CompressionParams compression) throws IOException;


void createEmpty() throws IOException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
Expand Down Expand Up @@ -700,6 +698,15 @@ private ChecksumIndexInput checksumIndexInput(IndexInput indexInput)

@Override
public IndexOutputWriter openOutput(boolean append) throws IOException
{
CompressionParams compression = context() != null
? context().getValueCompression()
: CompressionParams.noCompression();
return openOutput(append, compression);
}

@Override
public IndexOutputWriter openOutput(boolean append, CompressionParams compression) throws IOException
{
File file = file();

Expand All @@ -708,25 +715,9 @@ public IndexOutputWriter openOutput(boolean append) throws IOException
component,
file);

return IndexFileUtils.instance.openOutput(file, byteOrder(), append, compressionMetaFile(), getCompression());
return IndexFileUtils.instance.openOutput(file, byteOrder(), append, compressionMetaFile(), compression);
}

private CompressionParams getCompression()
{
if (!component.compressed)
return CompressionParams.noCompression();

// Compress per-sstable components with the settings from the sstable metadata.
// Compress per-index components with the settings from the index metadata.
if (context != null)
return context.getCompression();

if (!Keyspace.isInitialized())
return CompressionParams.noCompression();

var cfs = ColumnFamilyStore.getIfExists(descriptor.ksname, descriptor.cfname);
return cfs.metadata().params.indexCompression;
}

@Override
public void createEmpty() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.cassandra.index.sai.memory.TrieMemtableIndex;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;

/**
Expand Down Expand Up @@ -114,10 +115,11 @@ IndexSearcher newIndexSearcher(SSTableContext sstableContext,
* Create a new writer for the per-SSTable on-disk components of an index.
*
* @param indexDescriptor The {@link IndexDescriptor} for the SSTable
* @param compression compression to use for per-sstable components
* @return The {@link PerSSTableWriter} to write the per-SSTable on-disk components
* @throws IOException
*/
public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException;
PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException;

/**
* Create a new writer for the per-index on-disk components of an index. The {@link LifecycleNewTracker}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.v1.bitpack.NumericValuesWriter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.lucene.util.IOUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
Expand Down Expand Up @@ -185,7 +186,7 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext,
}

@Override
public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException
public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException
{
return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.cassandra.index.sai.disk.v1.bitpack.NumericValuesWriter;
import org.apache.cassandra.index.sai.disk.v2.sortedterms.SortedTermsWriter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.lucene.util.IOUtils;

public class SSTableComponentsWriter implements PerSSTableWriter
Expand All @@ -44,7 +45,7 @@ public class SSTableComponentsWriter implements PerSSTableWriter
private final NumericValuesWriter blockFPWriter;
private final SortedTermsWriter sortedTermsWriter;

public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents) throws IOException
public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents, CompressionParams compression) throws IOException
{
this.perSSTableComponents = perSSTableComponents;
this.metadataWriter = new MetadataWriter(perSSTableComponents);
Expand All @@ -56,7 +57,8 @@ public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents) th
this.sortedTermsWriter = new SortedTermsWriter(perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS),
metadataWriter,
blockFPWriter,
perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE));
perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE),
compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompressionParams;

/**
* Updates SAI OnDiskFormat to include full PK -> offset mapping, and adds vector components.
Expand Down Expand Up @@ -122,9 +123,9 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext,
}

@Override
public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException
public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException
{
return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite());
return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite(), compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.tries.IncrementalTrieWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.lucene.store.IndexOutput;
Expand Down Expand Up @@ -104,14 +105,15 @@ public class SortedTermsWriter implements Closeable
public SortedTermsWriter(@NonNull IndexComponent.ForWrite termsDataComponent,
@NonNull MetadataWriter metadataWriter,
@Nonnull NumericValuesWriter termsDataBlockOffsets,
@Nonnull IndexComponent.ForWrite trieComponent) throws IOException
@Nonnull IndexComponent.ForWrite trieComponent,
@Nonnull CompressionParams compression) throws IOException
{
this.componentName = termsDataComponent.fileNamePart();
this.metadataWriter = metadataWriter;
this.trieOutput = trieComponent.openOutput();
this.trieOutput = trieComponent.openOutput(compression);
SAICodecUtils.writeHeader(this.trieOutput);
this.trieWriter = IncrementalTrieWriter.open(trieSerializer, trieOutput.asSequentialWriter(), TypeUtil.BYTE_COMPARABLE_VERSION);
this.termsOutput = termsDataComponent.openOutput();
this.termsOutput = termsDataComponent.openOutput(compression);
SAICodecUtils.writeHeader(termsOutput);
this.bytesStartFP = termsOutput.getFilePointer();
this.offsetsWriter = termsDataBlockOffsets;
Expand Down
Loading

0 comments on commit 6969bb1

Please sign in to comment.