From 6969bb15b5d041eac395d28e70a87a68c5f72140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 24 Jan 2025 18:18:24 +0100 Subject: [PATCH] Move per-sstable compression control to CREATE INDEX Index compression options have been split into key_compression and value_compression so you can write: CREATE INDEX ON tab(v) WITH key_compression = { 'class': 'LZ4Compressor' } AND value_compression = { 'class': 'LZ4Compressor' } --- .../org/apache/cassandra/cql3/CqlBuilder.java | 33 ++++++++++++ .../schema/CreateIndexStatement.java | 13 +++-- .../statements/schema/IndexAttributes.java | 6 ++- .../statements/schema/TableAttributes.java | 3 -- .../cassandra/index/sai/IndexContext.java | 9 +++- .../sai/disk/StorageAttachedIndexWriter.java | 5 +- .../index/sai/disk/format/IndexComponent.java | 14 ++++- .../sai/disk/format/IndexDescriptor.java | 29 ++++------- .../index/sai/disk/format/OnDiskFormat.java | 4 +- .../sai/disk/v1/SSTableComponentsWriter.java | 1 + .../index/sai/disk/v1/V1OnDiskFormat.java | 3 +- .../sai/disk/v2/SSTableComponentsWriter.java | 6 ++- .../index/sai/disk/v2/V2OnDiskFormat.java | 5 +- .../v2/sortedterms/SortedTermsWriter.java | 8 +-- .../cassandra/schema/IndexMetadata.java | 52 +++++++++---------- .../cassandra/schema/SchemaKeyspace.java | 15 ++++-- .../apache/cassandra/schema/TableParams.java | 27 +--------- .../v2/sortedbytes/SortedTermsBenchmark.java | 4 +- .../index/sai/cql/IndexCompressionTest.java | 8 +-- .../disk/v2/sortedterms/SortedTermsTest.java | 13 +++-- 20 files changed, 153 insertions(+), 105 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/CqlBuilder.java b/src/java/org/apache/cassandra/cql3/CqlBuilder.java index 9e88feac8da8..870473bf3676 100644 --- a/src/java/org/apache/cassandra/cql3/CqlBuilder.java +++ b/src/java/org/apache/cassandra/cql3/CqlBuilder.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; @@ -238,4 +239,36 @@ public String toString() { return builder.toString(); } + + /** + * Builds a `WITH option1 = ... AND option2 = ... AND option3 = ... clause + * @param builder a receiver to receive a builder allowing to add each option + */ + public CqlBuilder appendOptions(Consumer builder) + { + builder.accept(new OptionsBuilder(this)); + return this; + } + + public static class OptionsBuilder + { + private CqlBuilder builder; + boolean empty = true; + + OptionsBuilder(CqlBuilder builder) + { + this.builder = builder; + } + + public OptionsBuilder append(String name, Map options) + { + if (options.isEmpty()) + return this; + + builder.append((empty ? "WITH " : "AND ") + name + " = "); + empty = false; + builder.append(options); + return this; + } + } } diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java index 9472afee7470..4784d2a894ea 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -162,12 +162,17 @@ public Keyspaces apply(Keyspaces schema) Map options = attrs.isCustom ? attrs.getOptions() : Collections.emptyMap(); - Map compressionOptions = attrs.getMap("compression"); - CompressionParams compression = compressionOptions != null - ? CompressionParams.fromMap(compressionOptions) + Map keyCompressionOptions = attrs.getMap("key_compression"); + CompressionParams keyCompression = keyCompressionOptions != null + ? CompressionParams.fromMap(keyCompressionOptions) : CompressionParams.noCompression(); - IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, name, kind, options, compression); + Map valueCompressionOptions = attrs.getMap("value_compression"); + CompressionParams valueCompression = valueCompressionOptions != null + ? CompressionParams.fromMap(valueCompressionOptions) + : CompressionParams.noCompression(); + + IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, name, kind, options, keyCompression, valueCompression); String className = index.getIndexClassName(); IndexGuardrails guardRails = IndexGuardrails.forClassName(className); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java b/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java index 7e7720d178cf..b92d44bdb572 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/IndexAttributes.java @@ -29,7 +29,8 @@ public class IndexAttributes extends PropertyDefinitions private static final String DEFAULT_INDEX_CLASS_PROPERTY = "cassandra.default_index_implementation_class"; private static final String KW_OPTIONS = "options"; - private static final String KW_COMPRESSION = "compression"; + private static final String KW_KEY_COMPRESSION = "key_compression"; + private static final String KW_VALUE_COMPRESSION = "value_compression"; private static final Set keywords = new HashSet<>(); private static final Set obsoleteKeywords = new HashSet<>(); @@ -40,7 +41,8 @@ public class IndexAttributes extends PropertyDefinitions static { keywords.add(KW_OPTIONS); - keywords.add(KW_COMPRESSION); + keywords.add(KW_KEY_COMPRESSION); + keywords.add(KW_VALUE_COMPRESSION); } public void maybeApplyDefaultIndex() diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java index 19a55e126451..112d0cfd7604 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java @@ -172,9 +172,6 @@ private TableParams build(TableParams.Builder builder) builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION))); } - if (hasOption(Option.INDEX_COMPRESSION)) - builder.indexCompression(CompressionParams.fromMap(getMap(Option.INDEX_COMPRESSION))); - if (hasOption(Option.MEMTABLE)) builder.memtable(MemtableParams.fromMap(getMap(Option.MEMTABLE))); diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index 9e2704943069..af4c60934fc1 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -610,9 +610,14 @@ public String getIndexName() return this.config == null ? null : config.name; } - public CompressionParams getCompression() + public CompressionParams getKeyCompression() { - return this.config.compression; + return this.config.keyCompression; + } + + public CompressionParams getValueCompression() + { + return this.config.valueCompression; } public int getIntOption(String name, int defaultValue) diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index adf18088376f..9d4597dcab18 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -44,6 +44,7 @@ import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.utils.Throwables; @@ -107,11 +108,13 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, .filter(Objects::nonNull) // a null here means the column had no data to flush .collect(Collectors.toList()); + CompressionParams keyCompression = indices.iterator().next().getIndexMetadata().keyCompression; + // If the SSTable components are already being built by another index build then we don't want // to build them again so use a NO-OP writer this.perSSTableWriter = perIndexComponentsOnly ? PerSSTableWriter.NONE - : onDiskFormat.newPerSSTableWriter(indexDescriptor); + : onDiskFormat.newPerSSTableWriter(indexDescriptor, keyCompression); this.tableMetrics = tableMetrics; } diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java index c24d52f6ea19..d019a82d2951 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java @@ -27,6 +27,7 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.schema.CompressionParams; import org.apache.lucene.store.ChecksumIndexInput; public interface IndexComponent @@ -81,7 +82,18 @@ default IndexOutputWriter openOutput() throws IOException return openOutput(false); } - IndexOutputWriter openOutput(boolean append) throws IOException; + default IndexOutputWriter openOutput(boolean append) throws IOException + { + return openOutput(append, CompressionParams.noCompression()); + } + + default IndexOutputWriter openOutput(CompressionParams compression) throws IOException + { + return openOutput(false, compression); + } + + IndexOutputWriter openOutput(boolean append, CompressionParams compression) throws IOException; + void createEmpty() throws IOException; } diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 88086a770c4b..ae0002c5ff33 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -38,8 +38,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CassandraRelevantProperties; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.StorageAttachedIndex; @@ -700,6 +698,15 @@ private ChecksumIndexInput checksumIndexInput(IndexInput indexInput) @Override public IndexOutputWriter openOutput(boolean append) throws IOException + { + CompressionParams compression = context() != null + ? context().getValueCompression() + : CompressionParams.noCompression(); + return openOutput(append, compression); + } + + @Override + public IndexOutputWriter openOutput(boolean append, CompressionParams compression) throws IOException { File file = file(); @@ -708,25 +715,9 @@ public IndexOutputWriter openOutput(boolean append) throws IOException component, file); - return IndexFileUtils.instance.openOutput(file, byteOrder(), append, compressionMetaFile(), getCompression()); + return IndexFileUtils.instance.openOutput(file, byteOrder(), append, compressionMetaFile(), compression); } - private CompressionParams getCompression() - { - if (!component.compressed) - return CompressionParams.noCompression(); - - // Compress per-sstable components with the settings from the sstable metadata. - // Compress per-index components with the settings from the index metadata. - if (context != null) - return context.getCompression(); - - if (!Keyspace.isInitialized()) - return CompressionParams.noCompression(); - - var cfs = ColumnFamilyStore.getIfExists(descriptor.ksname, descriptor.cfname); - return cfs.metadata().params.indexCompression; - } @Override public void createEmpty() throws IOException diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java index d19dac7268cd..e12847ceeb81 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java @@ -40,6 +40,7 @@ import org.apache.cassandra.index.sai.memory.TrieMemtableIndex; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.bytecomparable.ByteComparable; /** @@ -114,10 +115,11 @@ IndexSearcher newIndexSearcher(SSTableContext sstableContext, * Create a new writer for the per-SSTable on-disk components of an index. * * @param indexDescriptor The {@link IndexDescriptor} for the SSTable + * @param compression compression to use for per-sstable components * @return The {@link PerSSTableWriter} to write the per-SSTable on-disk components * @throws IOException */ - public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException; + PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException; /** * Create a new writer for the per-index on-disk components of an index. The {@link LifecycleNewTracker} diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableComponentsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableComponentsWriter.java index 263ac680cd73..c48b517728df 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableComponentsWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableComponentsWriter.java @@ -29,6 +29,7 @@ import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.v1.bitpack.NumericValuesWriter; import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.schema.CompressionParams; import org.apache.lucene.util.IOUtils; /** diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 5dbd732f4676..72c81285b9bb 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -58,6 +58,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; @@ -185,7 +186,7 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext, } @Override - public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException + public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException { return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite()); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/SSTableComponentsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v2/SSTableComponentsWriter.java index f4bfe3c34e0e..e4c83f4775f6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/SSTableComponentsWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/SSTableComponentsWriter.java @@ -32,6 +32,7 @@ import org.apache.cassandra.index.sai.disk.v1.bitpack.NumericValuesWriter; import org.apache.cassandra.index.sai.disk.v2.sortedterms.SortedTermsWriter; import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.schema.CompressionParams; import org.apache.lucene.util.IOUtils; public class SSTableComponentsWriter implements PerSSTableWriter @@ -44,7 +45,7 @@ public class SSTableComponentsWriter implements PerSSTableWriter private final NumericValuesWriter blockFPWriter; private final SortedTermsWriter sortedTermsWriter; - public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents) throws IOException + public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents, CompressionParams compression) throws IOException { this.perSSTableComponents = perSSTableComponents; this.metadataWriter = new MetadataWriter(perSSTableComponents); @@ -56,7 +57,8 @@ public SSTableComponentsWriter(IndexComponents.ForWrite perSSTableComponents) th this.sortedTermsWriter = new SortedTermsWriter(perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE)); + perSSTableComponents.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + compression); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java index 8ad67fcfd9e1..9e4a9144b7c6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java @@ -43,6 +43,7 @@ import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompressionParams; /** * Updates SAI OnDiskFormat to include full PK -> offset mapping, and adds vector components. @@ -122,9 +123,9 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext, } @Override - public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor) throws IOException + public PerSSTableWriter newPerSSTableWriter(IndexDescriptor indexDescriptor, CompressionParams compression) throws IOException { - return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite()); + return new SSTableComponentsWriter(indexDescriptor.newPerSSTableComponentsForWrite(), compression); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java index 6c18f7979040..6680e1649cba 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java @@ -34,6 +34,7 @@ import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.tries.IncrementalTrieWriter; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; import org.apache.lucene.store.IndexOutput; @@ -104,14 +105,15 @@ public class SortedTermsWriter implements Closeable public SortedTermsWriter(@NonNull IndexComponent.ForWrite termsDataComponent, @NonNull MetadataWriter metadataWriter, @Nonnull NumericValuesWriter termsDataBlockOffsets, - @Nonnull IndexComponent.ForWrite trieComponent) throws IOException + @Nonnull IndexComponent.ForWrite trieComponent, + @Nonnull CompressionParams compression) throws IOException { this.componentName = termsDataComponent.fileNamePart(); this.metadataWriter = metadataWriter; - this.trieOutput = trieComponent.openOutput(); + this.trieOutput = trieComponent.openOutput(compression); SAICodecUtils.writeHeader(this.trieOutput); this.trieWriter = IncrementalTrieWriter.open(trieSerializer, trieOutput.asSequentialWriter(), TypeUtil.BYTE_COMPARABLE_VERSION); - this.termsOutput = termsDataComponent.openOutput(); + this.termsOutput = termsDataComponent.openOutput(compression); SAICodecUtils.writeHeader(termsOutput); this.bytesStartFP = termsOutput.getFilePointer(); this.offsetsWriter = termsDataBlockOffsets; diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java index 28487ada23e9..0bb3ced390be 100644 --- a/src/java/org/apache/cassandra/schema/IndexMetadata.java +++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java @@ -37,7 +37,6 @@ import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.internal.CassandraIndex; @@ -81,29 +80,36 @@ public enum Kind public final String name; public final Kind kind; - public final CompressionParams compression; + public final CompressionParams keyCompression; + public final CompressionParams valueCompression; public final Map options; private IndexMetadata(String name, Map options, Kind kind, - CompressionParams compression) + CompressionParams keyCompression, + CompressionParams valueCompression) { this.id = UUID.nameUUIDFromBytes(name.getBytes()); this.name = name; this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options); this.kind = kind; - this.compression = compression; + this.keyCompression = keyCompression; + this.valueCompression = valueCompression; } public static IndexMetadata fromSchemaMetadata(String name, Kind kind, Map options) { - return new IndexMetadata(name, options, kind, CompressionParams.noCompression()); + return new IndexMetadata(name, options, kind, CompressionParams.noCompression(), CompressionParams.noCompression()); } - public static IndexMetadata fromSchemaMetadata(String name, Kind kind, Map options, CompressionParams compression) + public static IndexMetadata fromSchemaMetadata(String name, + Kind kind, + Map options, + CompressionParams keyCompression, + CompressionParams valueCompression) { - return new IndexMetadata(name, options, kind, compression); + return new IndexMetadata(name, options, kind, keyCompression, valueCompression); } public static IndexMetadata fromIndexTargets(List targets, @@ -111,20 +117,22 @@ public static IndexMetadata fromIndexTargets(List targets, Kind kind, Map options) { - return fromIndexTargets(targets, name, kind, options, CompressionParams.noCompression()); + return fromIndexTargets(targets, name, kind, options, + CompressionParams.noCompression(), CompressionParams.noCompression()); } public static IndexMetadata fromIndexTargets(List targets, String name, Kind kind, Map options, - CompressionParams compression) + CompressionParams keyCompression, + CompressionParams valueCompression) { Map newOptions = new HashMap<>(options); newOptions.put(IndexTarget.TARGET_OPTION_NAME, targets.stream() .map(target -> target.asCqlString()) .collect(Collectors.joining(", "))); - return new IndexMetadata(name, newOptions, kind, compression); + return new IndexMetadata(name, newOptions, kind, keyCompression, valueCompression); } public static boolean isNameValid(String name) @@ -271,6 +279,8 @@ public String toString() .append("name", name) .append("kind", kind) .append("options", options) + .append("keyCompression", keyCompression) + .append("valueCompression", keyCompression) .build(); } @@ -308,22 +318,12 @@ public void appendCqlTo(CqlBuilder builder, TableMetadata table, boolean ifNotEx .append(") USING ") .appendWithSingleQuotes(copyOptions.remove(IndexTarget.CUSTOM_INDEX_OPTION_NAME)); - if (compression.isEnabled()) - { - builder.append(" WITH compression = ") - .append(compression.asMap()); - - if (!copyOptions.isEmpty()) - { - builder.append(" AND options = ") - .append(copyOptions); - } - } - else if (!copyOptions.isEmpty()) - { - builder.append(" WITH options = ") - .append(copyOptions); - } + + builder.appendOptions(b -> { + b.append("options", copyOptions); + b.append("value_compression", valueCompression.asMap()); + b.append("key_compression", keyCompression.asMap()); + }); } else { diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 2190d5d7e52b..e958286dc1c3 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -1124,11 +1124,18 @@ private static IndexMetadata createIndexMetadataFromRow(UntypedResultSet.Row row String name = row.getString("index_name"); IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind")); Map options = row.getFrozenTextMap("options"); - Map compressionOptions = row.getFrozenTextMap("compression"); - CompressionParams compression = compressionOptions != null - ? CompressionParams.fromMap(compressionOptions) + + Map keyCompressionOptions = row.getFrozenTextMap("key_compression"); + CompressionParams keyCompression = keyCompressionOptions != null + ? CompressionParams.fromMap(keyCompressionOptions) : CompressionParams.noCompression(); - return IndexMetadata.fromSchemaMetadata(name, type, options, compression); + + Map valueCompressionOptions = row.getFrozenTextMap("value_compression"); + CompressionParams valueCompression = keyCompressionOptions != null + ? CompressionParams.fromMap(valueCompressionOptions) + : CompressionParams.noCompression(); + + return IndexMetadata.fromSchemaMetadata(name, type, options, keyCompression, valueCompression); } private static Triggers fetchTriggers(String keyspace, String table) diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java index 50662e25cbb9..b2cf9d821451 100644 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -46,7 +46,6 @@ public enum Option COMMENT, COMPACTION, COMPRESSION, - INDEX_COMPRESSION, MEMTABLE, DEFAULT_TIME_TO_LIVE, EXTENSIONS, @@ -80,7 +79,6 @@ public String toString() public final CachingParams caching; public final CompactionParams compaction; public final CompressionParams compression; - public final CompressionParams indexCompression; public final MemtableParams memtable; public final ImmutableMap extensions; public final boolean cdc; @@ -103,7 +101,6 @@ private TableParams(Builder builder) caching = builder.caching; compaction = builder.compaction; compression = builder.compression; - indexCompression = builder.indexCompression; memtable = builder.memtable; extensions = builder.extensions; cdc = builder.cdc; @@ -122,7 +119,6 @@ public static Builder builder(TableParams params) .comment(params.comment) .compaction(params.compaction) .compression(params.compression) - .indexCompression(params.indexCompression) .memtable(params.memtable) .crcCheckChance(params.crcCheckChance) .defaultTimeToLive(params.defaultTimeToLive) @@ -219,7 +215,6 @@ public boolean equals(Object o) && caching.equals(p.caching) && compaction.equals(p.compaction) && compression.equals(p.compression) - && indexCompression.equals(p.indexCompression) && memtable.equals(p.memtable) && extensions.equals(p.extensions) && cdc == p.cdc @@ -241,7 +236,6 @@ public int hashCode() caching, compaction, compression, - indexCompression, memtable, extensions, cdc, @@ -264,7 +258,6 @@ public String toString() .add(Option.CACHING.toString(), caching) .add(Option.COMPACTION.toString(), compaction) .add(Option.COMPRESSION.toString(), compression) - .add(Option.INDEX_COMPRESSION.toString(), indexCompression) .add(Option.MEMTABLE.toString(), memtable) .add(Option.EXTENSIONS.toString(), extensions) .add(Option.CDC.toString(), cdc) @@ -294,7 +287,6 @@ public void appendCqlTo(CqlBuilder builder, boolean isView) .append("AND crc_check_chance = ").append(crcCheckChance) .newLine(); - if (!isView) { builder.append("AND default_time_to_live = ").append(defaultTimeToLive) @@ -308,16 +300,8 @@ public void appendCqlTo(CqlBuilder builder, boolean isView) false) .newLine() .append("AND gc_grace_seconds = ").append(gcGraceSeconds) - .newLine(); - - - if (indexCompression.isEnabled()) - { - builder.append("AND index_compression = ").append(indexCompression.asMap()) - .newLine(); - } - - builder.append("AND max_index_interval = ").append(maxIndexInterval) + .newLine() + .append("AND max_index_interval = ").append(maxIndexInterval) .newLine() .append("AND memtable_flush_period_in_ms = ").append(memtableFlushPeriodInMs) .newLine() @@ -343,7 +327,6 @@ public static final class Builder private CachingParams caching = CachingParams.DEFAULT; private CompactionParams compaction = CompactionParams.DEFAULT; private CompressionParams compression = CompressionParams.DEFAULT; - private CompressionParams indexCompression = CompressionParams.noCompression(); private MemtableParams memtable = MemtableParams.DEFAULT; private ImmutableMap extensions = ImmutableMap.of(); private boolean cdc; @@ -442,12 +425,6 @@ public Builder compression(CompressionParams val) return this; } - public Builder indexCompression(CompressionParams val) - { - indexCompression = val; - return this; - } - public Builder cdc(boolean val) { cdc = val; diff --git a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v2/sortedbytes/SortedTermsBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v2/sortedbytes/SortedTermsBenchmark.java index 4db530aa0c3e..df032749ceb9 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/index/sai/v2/sortedbytes/SortedTermsBenchmark.java +++ b/test/microbench/org/apache/cassandra/test/microbench/index/sai/v2/sortedbytes/SortedTermsBenchmark.java @@ -38,6 +38,7 @@ import org.apache.cassandra.index.sai.disk.v2.sortedterms.SortedTermsWriter; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.test.microbench.index.sai.v1.AbstractOnDiskBenchmark; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; @@ -122,7 +123,8 @@ public void perTrialSetup2() throws IOException SortedTermsWriter writer = new SortedTermsWriter(components.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE))) + components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + CompressionParams.noCompression())) { for (int x = 0; x < NUM_ROWS; x++) { diff --git a/test/unit/org/apache/cassandra/index/sai/cql/IndexCompressionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/IndexCompressionTest.java index b56dc1aa405f..46e9f6c10ad4 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/IndexCompressionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/IndexCompressionTest.java @@ -42,8 +42,8 @@ public class IndexCompressionTest extends SAITester @Test public void testKeyCompression() { - createTable("CREATE TABLE %s (pk int, c text, val text, PRIMARY KEY(pk, c)) WITH index_compression = {'class': 'LZ4Compressor'}"); - String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); + createTable("CREATE TABLE %s (pk int, c text, val text, PRIMARY KEY(pk, c))"); + String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH key_compression = {'class': 'LZ4Compressor'}"); for (int i = 0; i < 1000; i++) execute("INSERT INTO %s(pk, c, val) VALUES (?, ?, ?)", i, "key", "value" + i); @@ -69,7 +69,7 @@ public void testKeyCompression() public void testLiteralValueCompression() { createTable("CREATE TABLE %s (pk int, c text, val text, PRIMARY KEY(pk, c))"); - String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH compression = {'class': 'LZ4Compressor'}"); + String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH value_compression = {'class': 'LZ4Compressor'}"); for (int i = 0; i < 1000; i++) execute("INSERT INTO %s(pk, c, val) VALUES (?, ?, ?)", i, "key", "value" + i); @@ -96,7 +96,7 @@ public void testLiteralValueCompression() public void testNumericValueCompression() { createTable("CREATE TABLE %s (pk int, c text, val int, PRIMARY KEY(pk, c))"); - String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH compression = {'class': 'LZ4Compressor'}"); + String indexName = createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex' WITH value_compression = {'class': 'LZ4Compressor'}"); for (int i = 0; i < 1000; i++) execute("INSERT INTO %s(pk, c, val) VALUES (?, ?, ?)", i, "key", i); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsTest.java index 09cd5efa0b15..334452243fd7 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsTest.java @@ -46,6 +46,7 @@ import org.apache.cassandra.index.sai.utils.SaiRandomizedTest; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; @@ -67,7 +68,8 @@ public void testLexicographicException() throws Exception try (SortedTermsWriter writer = new SortedTermsWriter(components.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE))) + components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + CompressionParams.noCompression())) { ByteBuffer buffer = Int32Type.instance.decompose(99999); ByteSource byteSource = Int32Type.instance.asComparableBytes(buffer, VERSION); @@ -109,7 +111,8 @@ public void testFileValidation() throws Exception try (SortedTermsWriter writer = new SortedTermsWriter(components.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE))) + components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + CompressionParams.noCompression())) { primaryKeys.forEach(primaryKey -> { try @@ -366,7 +369,8 @@ private void writeTerms(IndexDescriptor indexDescriptor, List terms) thr try (SortedTermsWriter writer = new SortedTermsWriter(components.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE))) + components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + CompressionParams.noCompression())) { for (int x = 0; x < 1000 * 4; x++) { @@ -392,7 +396,8 @@ private void writeTerms(IndexDescriptor indexDescriptor, List termsM try (SortedTermsWriter writer = new SortedTermsWriter(components.addOrGet(IndexComponentType.PRIMARY_KEY_BLOCKS), metadataWriter, blockFPWriter, - components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE))) + components.addOrGet(IndexComponentType.PRIMARY_KEY_TRIE), + CompressionParams.noCompression())) { for (int x = 0; x < 1000 ; x++) {