Skip to content

Commit 8b79442

Browse files
committed
HCD-209: Fix queries on early-open BTI files
also includes fix for CASSANDRA-20976
1 parent 8277624 commit 8b79442

File tree

9 files changed

+269
-15
lines changed

9 files changed

+269
-15
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ Future version (tbd)
33
Merged from 5.1:
44
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
55
Merged from 5.0:
6+
* Fix range queries on early-open BTI files (CASSANDRA-20976)
67
* Improve error messages when initializing auth classes (CASSANDRA-20368 and CASSANDRA-20450)
78
* Use ParameterizedClass for all auth-related implementations (CASSANDRA-19946 and partially CASSANDRA-18554)
89
* Enables IAuthenticator's to return own AuthenticateMessage (CASSANDRA-19984)

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, DecoratedKey
9494
this.version = version;
9595
}
9696

97-
private PartitionIndex(PartitionIndex src)
97+
protected PartitionIndex(PartitionIndex src)
9898
{
9999
this(src.fh, src.root, src.keyCount, src.first, src.last, src.filterFirst, src.filterLast, src.version);
100100
}

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ private void refreshReadableBoundary()
110110
if (partitionIndexSyncPosition < partialIndexPartitionEnd)
111111
return;
112112

113-
writer.updateFileHandle(fhBuilder);
114113
try (FileHandle fh = fhBuilder.withLength(writer.getLastFlushOffset()).complete())
115114
{
116115
PartitionIndex pi = new PartitionIndexEarly(fh,

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@ public PartitionIndexEarly(FileHandle fh, long trieRoot, long keyCount, Decorate
4343
this.tail = tail;
4444
}
4545

46+
protected PartitionIndexEarly(PartitionIndexEarly partitionIndexEarly)
47+
{
48+
super(partitionIndexEarly);
49+
this.cutoff = partitionIndexEarly.cutoff;
50+
this.tail = partitionIndexEarly.tail;
51+
}
52+
53+
@Override
54+
public PartitionIndex sharedCopy()
55+
{
56+
return new PartitionIndexEarly(this);
57+
}
58+
4659
@Override
4760
protected Rebufferer instantiateRebufferer()
4861
{

src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,21 @@ class PartitionIterator extends PartitionIndex.IndexPosIterator implements Parti
6464
this.rowIndexFile = rowIndexFile;
6565
this.dataFile = dataFile;
6666

67-
readNext();
68-
// first value can be off
69-
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
67+
try
7068
{
7169
readNext();
70+
// first value can be off
71+
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
72+
{
73+
readNext();
74+
}
75+
advance();
76+
}
77+
catch (Throwable t)
78+
{
79+
super.close();
80+
throw t;
7281
}
73-
advance();
7482
}
7583

7684
/**

src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
204204
StatsMetadata stats = (StatsMetadata) finalMetadata.get(MetadataType.STATS);
205205
CompactionMetadata compactionMetadata = (CompactionMetadata) finalMetadata.get(MetadataType.COMPACTION);
206206

207-
FileHandle ifile = iwriter.rowIndexFHBuilder.withLength(iwriter.rowIndexFile.getLastFlushOffset()).complete();
207+
iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder);
208+
FileHandle ifile = iwriter.rowIndexFHBuilder.complete();
208209
// With trie indices it is no longer necessary to limit the file size; just make sure indices and data
209210
// get updated length / compression metadata.
210211
dataFile.updateFileHandle(dbuilder, dataLength);
@@ -232,6 +233,7 @@ public SSTableReader openFinalEarly()
232233
// ensure outstanding openEarly actions are not triggered.
233234
dataFile.sync();
234235
iwriter.rowIndexFile.sync();
236+
iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder);
235237
// Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and
236238
// retain a partially-written page (see DB-2446).
237239

@@ -397,7 +399,6 @@ public long append(DecoratedKey key, RowIndexEntry indexEntry) throws IOExceptio
397399

398400
public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
399401
{
400-
rowIndexFile.updateFileHandle(rowIndexFHBuilder);
401402
return partitionIndex.buildPartial(callWhenReady, rowIndexFile.position(), dataPosition);
402403
}
403404

@@ -450,8 +451,6 @@ protected void doPrepare()
450451

451452
// truncate index file
452453
rowIndexFile.prepareToCommit();
453-
rowIndexFHBuilder.withLength(rowIndexFile.getLastFlushOffset());
454-
//TODO figure out whether the update should be done before or after the prepare to commit
455454
rowIndexFile.updateFileHandle(rowIndexFHBuilder);
456455

457456
complete();

src/java/org/apache/cassandra/io/tries/ValueIterator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,18 @@ protected ValueIterator(Rebufferer source, long root, ByteComparable start, Byte
6767
{
6868
super(source, root, end != null ? end.asComparableBytes(version) : null, collecting, version);
6969

70-
if (start != null)
71-
initializeWithLeftBound(root, start.asComparableBytes(byteComparableVersion), admitPrefix, limit != null);
72-
else
73-
initializeNoLeftBound(root, limit != null ? limit.next() : 256);
70+
try
71+
{
72+
if (start != null)
73+
initializeWithLeftBound(root, start.asComparableBytes(byteComparableVersion), admitPrefix, limit != null);
74+
else
75+
initializeNoLeftBound(root, limit != null ? limit.next() : 256);
76+
}
77+
catch (Throwable t)
78+
{
79+
super.close();
80+
throw t;
81+
}
7482
}
7583

7684
private void initializeWithLeftBound(long root, ByteSource start, LeftBoundTreatment admitPrefix, boolean atLimit)

src/java/org/apache/cassandra/io/tries/Walker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Walker(Rebufferer source, long root, ByteComparable.Version version)
7070
bh = source.rebuffer(root);
7171
buf = bh.buffer();
7272
}
73-
catch (RuntimeException ex)
73+
catch (Throwable ex)
7474
{
7575
if (bh != null) bh.release();
7676
source.closeReader();
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.cql3;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Random;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import org.junit.After;
32+
import org.junit.Assert;
33+
import org.junit.Test;
34+
35+
import org.apache.cassandra.config.DatabaseDescriptor;
36+
import org.apache.cassandra.db.ColumnFamilyStore;
37+
import org.hamcrest.Matchers;
38+
39+
import static org.junit.Assert.assertEquals;
40+
import static org.junit.Assert.assertTrue;
41+
42+
public class EarlyOpenCompactionTest extends CQLTester
43+
{
44+
private static final int NUM_PARTITIONS = 1000;
45+
private static final int NUM_ROWS_PER_PARTITION = 100;
46+
private static final int VALUE_SIZE = 1000; // ~1KB per row
47+
private static final int VERIFICATION_THREADS = 4;
48+
49+
private final AtomicBoolean stopVerification = new AtomicBoolean(false);
50+
private final AtomicInteger verificationErrors = new AtomicInteger(0);
51+
private final Random random = new Random();
52+
private ExecutorService executor;
53+
54+
@After
55+
public void cleanupAfter() throws Throwable
56+
{
57+
stopVerification.set(true);
58+
if (executor != null)
59+
{
60+
executor.shutdownNow();
61+
executor.awaitTermination(1, TimeUnit.MINUTES);
62+
}
63+
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(50);
64+
}
65+
66+
@Test
67+
public void testEarlyOpenDuringCompaction() throws Throwable
68+
{
69+
// Create a table with a simple schema
70+
createTable("CREATE TABLE %s (" +
71+
"pk int, " +
72+
"ck int, " +
73+
"data text, " +
74+
"PRIMARY KEY (pk, ck)" +
75+
")");
76+
77+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
78+
disableCompaction();
79+
80+
// Insert data to create multiple SSTables
81+
System.out.println("Inserting test data...");
82+
for (int i = 0; i < NUM_PARTITIONS; i++)
83+
{
84+
for (int j = 0; j < NUM_ROWS_PER_PARTITION; j++)
85+
{
86+
String value = randomString(VALUE_SIZE);
87+
execute("INSERT INTO %s (pk, ck, data) VALUES (?, ?, ?)", i, j, value);
88+
}
89+
90+
// Flush from time to time to get 10 sstables
91+
if (i > 0 && i % Math.max(1, NUM_PARTITIONS / 10) == 0)
92+
{
93+
flush();
94+
}
95+
}
96+
97+
// Final flush to ensure all data is written
98+
flush();
99+
100+
// Verify we have multiple SSTables
101+
int sstableCount = cfs.getLiveSSTables().size();
102+
assertTrue("Expected multiple SSTables, got: " + sstableCount, sstableCount > 1);
103+
104+
// Start verification threads
105+
System.out.println("Starting verification threads...");
106+
executor = Executors.newFixedThreadPool(VERIFICATION_THREADS);
107+
List<Future<?>> futures = new ArrayList<>();
108+
109+
for (int i = 0; i < VERIFICATION_THREADS; i++)
110+
{
111+
futures.add(executor.submit(new VerificationTask()));
112+
}
113+
114+
// Wait a bit to ensure verification is running
115+
Thread.sleep(1000);
116+
117+
// Set early open interval to 1MiB to trigger early open during compaction
118+
int intervalMB = 1;
119+
System.out.println("Setting early open interval to " + intervalMB + "MiB...");
120+
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(intervalMB);
121+
// Slow down compaction to give the verifier time to fail.
122+
DatabaseDescriptor.setCompactionThroughputMbPerSec(10);
123+
124+
// Trigger compaction and await its completion
125+
System.out.println("Starting compaction...");
126+
cfs.enableAutoCompaction(true);
127+
128+
// Let verification run for a while during and after compaction
129+
System.out.println("Verifying data during and after compaction...");
130+
Thread.sleep(1000);
131+
132+
// Stop verification
133+
stopVerification.set(true);
134+
135+
// Wait for verification to complete
136+
for (Future<?> future : futures)
137+
{
138+
try
139+
{
140+
future.get(10, TimeUnit.SECONDS);
141+
}
142+
catch (Exception e)
143+
{
144+
System.err.println("Verification task failed: " + e);
145+
e.printStackTrace();
146+
}
147+
}
148+
149+
// Verify no errors occurred during verification
150+
int errors = verificationErrors.get();
151+
assertEquals("Found " + errors + " verification errors. Check logs for details.", 0, errors);
152+
153+
System.out.println("Test completed successfully");
154+
}
155+
156+
private class VerificationTask implements Runnable
157+
{
158+
@Override
159+
public void run()
160+
{
161+
try
162+
{
163+
Random localRandom = new Random(Thread.currentThread().getId());
164+
165+
while (!stopVerification.get() && !Thread.currentThread().isInterrupted())
166+
{
167+
// Randomly choose between point query and partition range query
168+
if (localRandom.nextBoolean())
169+
{
170+
// Point query
171+
int pk = localRandom.nextInt(NUM_PARTITIONS * 110 / 100); // 10% chance outside
172+
int ck = localRandom.nextInt(NUM_ROWS_PER_PARTITION * 110 / 100); // 10% chance outside
173+
174+
try
175+
{
176+
Assert.assertEquals(pk < NUM_PARTITIONS && ck < NUM_ROWS_PER_PARTITION ? 1 : 0,
177+
execute("SELECT data FROM %s WHERE pk = ? AND ck = ?", pk, ck).size());
178+
}
179+
catch (Throwable t)
180+
{
181+
verificationErrors.incrementAndGet();
182+
System.err.println("Point query failed for pk=" + pk + ", ck=" + ck + ": " + t);
183+
t.printStackTrace();
184+
}
185+
}
186+
else
187+
{
188+
// Partition range query
189+
int pk = localRandom.nextInt(NUM_PARTITIONS);
190+
191+
try
192+
{
193+
Assert.assertThat(execute("SELECT data FROM %s WHERE token(pk) <= token(?) AND token(pk) >= token(?)", pk, pk).size(),
194+
Matchers.greaterThanOrEqualTo(NUM_ROWS_PER_PARTITION));
195+
}
196+
catch (Throwable t)
197+
{
198+
verificationErrors.incrementAndGet();
199+
System.err.println("Range query failed for pk in (" + pk + ", " + (pk + 1) + ", " + (pk + 2) + "): " + t);
200+
t.printStackTrace();
201+
}
202+
}
203+
204+
// Add a small delay to prevent overwhelming the system
205+
Thread.yield();
206+
}
207+
}
208+
catch (Throwable t)
209+
{
210+
verificationErrors.incrementAndGet();
211+
System.err.println("Verification task failed: " + t);
212+
t.printStackTrace();
213+
}
214+
}
215+
}
216+
217+
private String randomString(int length)
218+
{
219+
StringBuilder sb = new StringBuilder(length);
220+
for (int i = 0; i < length; i++)
221+
{
222+
sb.append((char)('a' + random.nextInt(26)));
223+
}
224+
return sb.toString();
225+
}
226+
}

0 commit comments

Comments
 (0)