Skip to content

Commit 9fd4c4c

Browse files
pcmanusblambov
andauthored
CNDB-13639: Adds optional prefetching for known sequential reads (#1692)
This introduces a `PrefetchingRebufferer` that, when enabled, prefetches (read from the underlying reader) a configurable number of chunks in advance (so only work on top of rebufferer factories that work with chunks, i.e. not uncompressed mmap). Prefetching must first be enabled globally by setting `-Dcassandra.read_prefetching_size_kb` to the desired value. With that, and assuming the disk access mode allows it (meaning, it's not uncompressed mmap), then prefetching will be applied to reads that are "clearly" sequential. Mostly, as of this patch, this means the sstable "scanners" and `SortedStringTableCursor`, so compaction, SAI index building and tools that read sstable fully (scrub, verifier) will benefit from it. Since rebufferers are synchronous, prefetching is done through a fixed thread pool. The number of thread of that pool can be set with `-Dcassandra.read_prefetching_threads` (but default to the number of "processors"). The `-Dcassandra.read_prefetching_window` can also be set to define how often prefetching is re-triggered. By default, when there is less than half of the value of `-Dcassandra.read_prefetching_size_kb` prefetched, then prefetching is triggered. See riptano/cndb#13639 for additional motivation. I'll note that this patch is adapted from the similar DSE behavior. However, there is a fair bit of modification since the DSE version was relying on the asynchronous nature of rebufferers there (it also had a few behavior I didn't fully understood and I didn't dig when it felt a bit specific to DSE). One point worth mentioning was that the DSE version was relying on the ability of the underlying channel to create batches when multiple chunks are prefetched. This is something that could have advantages and we could try to add back over time, but it's a lot harder to see how to do that in the non-async world of C*. Typically, in DSE, since all the prefetches were initiated on the current thread (without blocking it), building the "batch" was relatively easy, but it doesn't translate in this patch. Not to mention that currently the underlying channels have no batching options and those would have to be added (in a way that trickled down to the channel implementation within CNDB). Anyway, as of this patch, chunks are prefetched in parallel but with no batching optimization. Which does mean the `window' parameter is probably not as useful as for DSE, but I've kept it nonetheless as it's not exactly adding complexity. In the context of compaction in CNDB though, I think this mean that it would kind of be ideal if we could use a relatively large "chunk size" for the readers: we read the full file anyway, so there is no waste there to use a large-ish chunk size, and it would kind of provide batching for prefetching "for free" (in the sense that if we want to prefetch, say, 128kb in advance, then we only need 2 chunks with a 64kb chunk, but need a lot more with a 4kb chunk size). --------- Co-authored-by: Branimir Lambov <[email protected]>
1 parent 1d762c6 commit 9fd4c4c

29 files changed

+1077
-47
lines changed

src/java/org/apache/cassandra/cache/ChunkCache.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.apache.cassandra.io.util.ChannelProxy;
5353
import org.apache.cassandra.io.util.ChunkReader;
5454
import org.apache.cassandra.io.util.File;
55+
import org.apache.cassandra.io.util.PrefetchingRebufferer;
56+
import org.apache.cassandra.io.util.ReadPattern;
5557
import org.apache.cassandra.io.util.Rebufferer;
5658
import org.apache.cassandra.io.util.RebuffererFactory;
5759
import org.apache.cassandra.metrics.ChunkCacheMetrics;
@@ -688,6 +690,12 @@ public BufferHolder rebuffer(long position)
688690
}
689691
}
690692

693+
@Override
694+
public int chunkSize()
695+
{
696+
return source.chunkSize();
697+
}
698+
691699
@Override
692700
public Rebufferer instantiateRebufferer()
693701
{

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.cassandra.exceptions.ConfigurationException;
2525
import org.apache.cassandra.io.compress.AdaptiveCompressor;
2626
import org.apache.cassandra.io.compress.LZ4Compressor;
27+
import org.apache.cassandra.io.util.RandomAccessReader;
2728
import org.apache.cassandra.metrics.TableMetrics;
2829
import org.apache.cassandra.net.MessagingService;
2930
import org.apache.cassandra.sensors.SensorsFactory;
@@ -601,6 +602,29 @@ public enum CassandraRelevantProperties
601602
*/
602603
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"),
603604

605+
/**
606+
* For reads that use prefetching (sequential ones), how much data is prefetced in kilobytes. If the value set is <= 0, then
607+
* prefetching is disabled. Also see {@link #READ_PREFETCHING_WINDOW}.
608+
* <p>
609+
* This is disabled by default because read prefectching is already provided by the OS when working with local disks.
610+
* But this is meant for tiered storage extensions, where prefetching may not be provided by the underlying
611+
* (custom) "filesystem".
612+
*/
613+
READ_PREFETCHING_SIZE_KB("cassandra.read_prefetching_size_kb", "-1"),
614+
615+
/**
616+
* The window (on the prefetching size) used to triggered prefetching. The prefetching algorithm ensures that at least
617+
* {@link #READ_PREFETCHING_WINDOW} * {@link #READ_PREFETCHING_SIZE_KB} prefetching has been requested, and if that's
618+
* not the case it requests up to {@link #READ_PREFETCHING_SIZE_KB}.
619+
*/
620+
READ_PREFETCHING_WINDOW("cassandra.read_prefetching_window", "0.5"),
621+
622+
/**
623+
* Number of threads used for read prefetching (when enabled). If unsed, a default based on the number of processors
624+
* is used.
625+
*/
626+
READ_PREFETCHING_THREADS("cassandra.read_prefetching_threads"),
627+
604628
/**
605629
* Allows custom implementation of {@link OperationContext.Factory} to optionally create and configure custom
606630
* {@link OperationContext} instances.

src/java/org/apache/cassandra/db/compaction/CompactionController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.cassandra.io.sstable.format.SSTableReader;
3737
import org.apache.cassandra.io.util.FileDataInput;
3838
import org.apache.cassandra.io.util.FileUtils;
39+
import org.apache.cassandra.io.util.ReadPattern;
3940
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
4041
import org.apache.cassandra.utils.concurrent.OpOrder;
4142

@@ -352,6 +353,6 @@ protected boolean ignoreOverlaps()
352353

353354
private FileDataInput openDataFile(SSTableReader reader)
354355
{
355-
return limiter != null ? reader.openDataReader(limiter) : reader.openDataReader();
356+
return limiter != null ? reader.openDataReader(limiter, ReadPattern.SEQUENTIAL) : reader.openDataReader(ReadPattern.SEQUENTIAL);
356357
}
357358
}

src/java/org/apache/cassandra/db/compaction/Scrubber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.cassandra.io.util.File;
6565
import org.apache.cassandra.io.util.FileUtils;
6666
import org.apache.cassandra.io.util.RandomAccessReader;
67+
import org.apache.cassandra.io.util.ReadPattern;
6768
import org.apache.cassandra.schema.TableMetadata;
6869
import org.apache.cassandra.service.ActiveRepairService;
6970
import org.apache.cassandra.utils.AbstractIterator;
@@ -157,8 +158,8 @@ public Scrubber(CompactionRealm realm,
157158
// partition header (key or data size) is corrupt. (This means our position in the index file will be one
158159
// partition "ahead" of the data file.)
159160
this.dataFile = isOffline
160-
? sstable.openDataReader()
161-
: sstable.openDataReader(CompactionManager.instance.getRateLimiter());
161+
? sstable.openDataReader(ReadPattern.SEQUENTIAL)
162+
: sstable.openDataReader(CompactionManager.instance.getRateLimiter(), ReadPattern.SEQUENTIAL);
162163

163164
try
164165
{

src/java/org/apache/cassandra/db/compaction/Verifier.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.cassandra.io.util.FileInputStreamPlus;
6464
import org.apache.cassandra.io.util.FileUtils;
6565
import org.apache.cassandra.io.util.RandomAccessReader;
66+
import org.apache.cassandra.io.util.ReadPattern;
6667
import org.apache.cassandra.schema.TableMetadata;
6768
import org.apache.cassandra.service.ActiveRepairService;
6869
import org.apache.cassandra.service.StorageService;
@@ -139,8 +140,8 @@ public Verifier(@Nullable CompactionRealm realm, SSTableReader sstable, OutputHa
139140

140141
this.fileAccessLock = new ReentrantReadWriteLock();
141142
this.dataFile = isOffline
142-
? sstable.openDataReader()
143-
: sstable.openDataReader(CompactionManager.instance.getRateLimiter());
143+
? sstable.openDataReader(ReadPattern.SEQUENTIAL)
144+
: sstable.openDataReader(CompactionManager.instance.getRateLimiter(), ReadPattern.SEQUENTIAL);
144145
this.verifyInfo = new VerifyInfo(dataFile, sstable, fileAccessLock.readLock());
145146
this.options = options;
146147
this.isOffline = isOffline;

src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.cassandra.io.sstable.format.RowIndexEntry;
5959
import org.apache.cassandra.io.sstable.format.SSTableReader;
6060
import org.apache.cassandra.io.util.RandomAccessReader;
61+
import org.apache.cassandra.io.util.ReadPattern;
6162
import org.apache.cassandra.schema.TableMetadata;
6263
import org.apache.cassandra.utils.ApproximateTime;
6364
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -155,7 +156,7 @@ private boolean indexSSTable(SSTableReader sstable, Set<StorageAttachedIndex> in
155156

156157
Set<Component> replacedComponents = new HashSet<>();
157158

158-
try (RandomAccessReader dataFile = sstable.openDataReader();
159+
try (RandomAccessReader dataFile = sstable.openDataReader(ReadPattern.SEQUENTIAL);
159160
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.INDEX_BUILD, tracker.metadata))
160161
{
161162
perSSTableFileLock = shouldWritePerSSTableFiles(sstable, indexDescriptor, replacedComponents);

src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.cassandra.io.sstable.format.SSTableReader;
4545
import org.apache.cassandra.io.util.File;
4646
import org.apache.cassandra.io.util.RandomAccessReader;
47+
import org.apache.cassandra.io.util.ReadPattern;
4748
import org.apache.cassandra.schema.ColumnMetadata;
4849
import org.apache.cassandra.utils.ByteBufferUtil;
4950
import org.apache.cassandra.utils.UUIDGen;
@@ -80,7 +81,7 @@ public void build()
8081
Map<ColumnMetadata, ColumnIndex> indexes = e.getValue();
8182

8283
SSTableWatcher.instance.onIndexBuild(sstable, builtIndexes);
83-
try (RandomAccessReader dataFile = sstable.openDataReader())
84+
try (RandomAccessReader dataFile = sstable.openDataReader(ReadPattern.SEQUENTIAL))
8485
{
8586
PerSSTableIndexWriter indexWriter = SASIIndex.newWriter(keyValidator, sstable.descriptor, indexes, OperationType.COMPACTION);
8687

src/java/org/apache/cassandra/io/sstable/compaction/SortedStringTableCursor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.cassandra.io.sstable.format.SSTableReader;
4343
import org.apache.cassandra.io.util.FileUtils;
4444
import org.apache.cassandra.io.util.RandomAccessReader;
45+
import org.apache.cassandra.io.util.ReadPattern;
4546
import org.apache.cassandra.schema.ColumnMetadata;
4647
import org.apache.cassandra.utils.ByteBufferUtil;
4748

@@ -83,17 +84,17 @@ public class SortedStringTableCursor implements SSTableCursor
8384

8485
public SortedStringTableCursor(SSTableReader sstable)
8586
{
86-
this(sstable, sstable.openDataReader(), null);
87+
this(sstable, sstable.openDataReader(ReadPattern.SEQUENTIAL), null);
8788
}
8889

8990
public SortedStringTableCursor(SSTableReader sstable, Range<Token> range)
9091
{
91-
this(sstable, sstable.openDataReader(), range);
92+
this(sstable, sstable.openDataReader(ReadPattern.SEQUENTIAL), range);
9293
}
9394

9495
public SortedStringTableCursor(SSTableReader sstable, Range<Token> tokenRange, RateLimiter limiter)
9596
{
96-
this(sstable, sstable.openDataReader(limiter), tokenRange);
97+
this(sstable, sstable.openDataReader(limiter, ReadPattern.SEQUENTIAL), tokenRange);
9798
}
9899

99100
public SortedStringTableCursor(SSTableReader sstable, RandomAccessReader dataFile, Range<Token> tokenRange)

src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
import org.apache.cassandra.io.util.FileOutputStreamPlus;
126126
import org.apache.cassandra.io.util.FileUtils;
127127
import org.apache.cassandra.io.util.RandomAccessReader;
128+
import org.apache.cassandra.io.util.ReadPattern;
128129
import org.apache.cassandra.io.util.SliceDescriptor;
129130
import org.apache.cassandra.metrics.RestorableMeter;
130131
import org.apache.cassandra.schema.CachingParams;
@@ -1966,7 +1967,9 @@ public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> bo
19661967

19671968
public FileDataInput getFileDataInput(long position)
19681969
{
1969-
return dfile.createReader(position);
1970+
// While `FileDataInput` supports a `seek` method in practive, it's an interface predominently made for
1971+
// sequential access. If random access on the returned input is desired, caller should use `#openDataReader`
1972+
return dfile.createReader(position, ReadPattern.SEQUENTIAL);
19701973
}
19711974

19721975
/**
@@ -2321,21 +2324,21 @@ public StatsMetadata getSSTableMetadata()
23212324
return sstableMetadata;
23222325
}
23232326

2324-
public RandomAccessReader openDataReader(RateLimiter limiter)
2327+
public RandomAccessReader openDataReader(RateLimiter limiter, ReadPattern accessPattern)
23252328
{
23262329
assert limiter != null;
2327-
return dfile.createReader(limiter);
2330+
return dfile.createReader(limiter, accessPattern);
23282331
}
23292332

2330-
public RandomAccessReader openDataReader()
2333+
public RandomAccessReader openDataReader(ReadPattern accessPattern)
23312334
{
2332-
return dfile.createReader();
2335+
return dfile.createReader(accessPattern);
23332336
}
23342337

2335-
public RandomAccessReader openIndexReader()
2338+
public RandomAccessReader openIndexReader(ReadPattern accessPattern)
23362339
{
23372340
if (ifile != null)
2338-
return ifile.createReader();
2341+
return ifile.createReader(accessPattern);
23392342
return null;
23402343
}
23412344

src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.cassandra.io.sstable.ISSTableScanner;
3232
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
3333
import org.apache.cassandra.io.util.RandomAccessReader;
34+
import org.apache.cassandra.io.util.ReadPattern;
3435
import org.apache.cassandra.schema.TableMetadata;
3536

3637
import static org.apache.cassandra.io.sstable.format.SSTableReader.PartitionPositionBounds;
@@ -71,7 +72,7 @@ public SSTableSimpleScanner(SSTableReader sstable,
7172
{
7273
assert sstable != null;
7374

74-
this.dfile = sstable.openDataReader();
75+
this.dfile = sstable.openDataReader(ReadPattern.SEQUENTIAL);
7576
this.sstable = sstable;
7677
this.sizeInBytes = boundsList.stream().mapToLong(ppb -> ppb.upperPosition - ppb.lowerPosition).sum();
7778
this.compressedSizeInBytes = sstable.compression ? sstable.onDiskSizeForPartitionPositions(boundsList) : sizeInBytes;

0 commit comments

Comments
 (0)