Skip to content

Commit c415ff4

Browse files
author
Piotr Kołaczkowski
committed
CNDB-16212: Harden RebufferingInputStream
This commit attempts to improve quality of RebufferingInputStream and its implementations by the following changes: - `reBuffer` contract has been documented - `reBuffer` implementations no longer throw to indicate EOF - `reBuffer` implementations fill at least 1 byte when not on EOF - `reBuffer` implementations do not leave null buffer after exiting normally - state Preconditions have been added to `reBuffer` - `readFully` has been rewritten to rely on the Java contract of `read` which is allowed to read less than `len` bytes, instead of assuming non-standard behaviour present in the old `read` implementation - `read` has been significantly simplified, yet it still obeys the Java InputStream contract - some code has been made final / moved to private to disallow accidental breakage of contracts in subclasses Those are not just code-style changes. In particular the following contract violating behaviors should be impossible now: - `read` throwing EOFException - `readFully` throwing EOFException before reaching the real end of stream Additionally, a comprehensive test suite has been added for `RebufferingInputStream`, which tests its logic in isolation from its concrete implementations.
1 parent 42ae0f3 commit c415ff4

16 files changed

+782
-50
lines changed

src/java/org/apache/cassandra/cache/ChunkCache.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,8 @@ public BufferHolder rebuffer(long position)
661661
// chunks.
662662
}
663663
}
664+
assert buf.offset() <= position : "Buffer offset " + buf.offset() + " must be <= requested position " + position;
665+
assert position == source.fileLength() || buf.buffer().limit() >= buf.offset() - position : "Buffer must be non-empty for non-EOF position " + position;
664666
return buf;
665667
}
666668
catch (Throwable t)

src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import java.io.DataInput;
2424
import java.nio.ByteBuffer;
2525

26+
import com.google.common.base.Preconditions;
27+
2628
import org.apache.cassandra.io.util.DataPosition;
2729
import org.apache.cassandra.io.util.File;
2830
import org.apache.cassandra.io.util.FileDataInput;
2931
import org.apache.cassandra.io.util.FileSegmentInputStream;
32+
import org.apache.cassandra.io.util.Rebufferer;
3033

3134
/**
3235
* Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
@@ -103,7 +106,10 @@ public long bytesPastMark(DataPosition mark)
103106

104107
public void reBuffer()
105108
{
109+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
106110
totalChunkOffset += buffer.position();
107111
buffer = chunkProvider.nextChunk();
112+
if (buffer == null)
113+
buffer = Rebufferer.EMPTY.buffer();
108114
}
109115
}

src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ThreadLocalRandom;
2525
import java.util.function.DoubleSupplier;
2626

27+
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.Iterators;
2829
import com.google.common.primitives.Ints;
2930

@@ -110,8 +111,11 @@ public void position(long position) throws IOException
110111
@Override
111112
protected void reBuffer() throws IOException
112113
{
113-
if (uncompressedChunkPosition < 0)
114-
throw new IllegalStateException("position(long position) wasn't called first");
114+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
115+
Preconditions.checkState(uncompressedChunkPosition >= 0, "position(long position) wasn't called first");
116+
117+
if (!compressedChunks.hasNext())
118+
return; // EOF, but we cannot signal it with throwing EOFException here because of the contract of reBuffer()
115119

116120
/*
117121
* reBuffer() will only be called if a partition range spanning multiple (adjacent) compressed chunks
@@ -120,6 +124,8 @@ protected void reBuffer() throws IOException
120124
*/
121125
loadNextChunk();
122126
uncompressedChunkPosition += compressionParams.chunkLength();
127+
128+
assert buffer.hasRemaining() || !compressedChunks.hasNext() : "Buffer should have remaining bytes or be at EOF";
123129
}
124130

125131
/**

src/java/org/apache/cassandra/hints/ChecksummedDataInput.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,10 @@ public boolean checkCrc() throws IOException
191191
}
192192

193193
@Override
194-
public void readFully(byte[] b) throws IOException
194+
public void readFully(byte[] b, int off, int len) throws IOException
195195
{
196-
checkLimit(b.length);
197-
super.readFully(b);
196+
checkLimit(len);
197+
super.readFully(b, off, len);
198198
}
199199

200200
@Override
@@ -207,7 +207,7 @@ public int read(byte[] b, int off, int len) throws IOException
207207
@Override
208208
protected void reBuffer()
209209
{
210-
Preconditions.checkState(buffer.remaining() == 0);
210+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
211211
updateCrc();
212212
bufferOffset += buffer.limit();
213213

@@ -219,6 +219,7 @@ protected void reBuffer()
219219
protected void readBuffer()
220220
{
221221
buffer.clear();
222+
//noinspection StatementWithEmptyBody
222223
while ((channel.read(buffer, bufferOffset)) == 0) {}
223224
buffer.flip();
224225
}

src/java/org/apache/cassandra/io/util/DataInputBuffer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import java.nio.ByteBuffer;
2121

22+
import com.google.common.base.Preconditions;
23+
2224
/**
2325
* Input stream around a single ByteBuffer.
2426
*/
@@ -58,7 +60,8 @@ public DataInputBuffer(byte[] buffer)
5860
@Override
5961
protected void reBuffer()
6062
{
61-
//nope, we don't rebuffer, we are done!
63+
Preconditions.checkState(!buffer.hasRemaining(), "reBuffer called with remaining bytes: %s", buffer.remaining());
64+
// nope, we don't rebuffer, we are done!
6265
}
6366

6467
@Override

src/java/org/apache/cassandra/io/util/FileInputStreamPlus.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.nio.file.NoSuchFileException;
2525
import java.nio.file.Path;
2626

27+
import com.google.common.base.Preconditions;
28+
2729
public class FileInputStreamPlus extends RebufferingInputStream
2830
{
2931
final FileChannel channel;
@@ -65,8 +67,10 @@ private FileInputStreamPlus(FileChannel channel, int bufferSize, Path path)
6567
@Override
6668
protected void reBuffer() throws IOException
6769
{
70+
Preconditions.checkState(buffer.remaining() == 0, "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
6871
buffer.clear();
69-
channel.read(buffer);
72+
//noinspection StatementWithEmptyBody
73+
while (channel.read(buffer) == 0) {}
7074
buffer.flip();
7175
}
7276

src/java/org/apache/cassandra/io/util/MemoryInputStream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteOrder;
2424

2525
import com.google.common.annotations.VisibleForTesting;
26+
import com.google.common.base.Preconditions;
2627
import com.google.common.primitives.Ints;
2728

2829
import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -51,6 +52,8 @@ public MemoryInputStream(Memory mem, int bufferSize)
5152
@Override
5253
protected void reBuffer() throws IOException
5354
{
55+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
56+
5457
if (offset - mem.peer >= mem.size())
5558
return;
5659

src/java/org/apache/cassandra/io/util/NIODataInputStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ public NIODataInputStream(ReadableByteChannel channel, int bufferSize)
6060
@Override
6161
protected void reBuffer() throws IOException
6262
{
63-
Preconditions.checkState(buffer.remaining() == 0);
64-
buffer.clear();
63+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
6564

65+
buffer.clear();
66+
//noinspection StatementWithEmptyBody
6667
while ((channel.read(buffer)) == 0) {}
67-
6868
buffer.flip();
6969
}
7070

src/java/org/apache/cassandra/io/util/RandomAccessReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.LongBuffer;
2626
import javax.annotation.concurrent.NotThreadSafe;
2727

28+
import com.google.common.base.Preconditions;
2829
import com.google.common.primitives.Ints;
2930

3031
import org.apache.cassandra.io.compress.BufferType;
@@ -61,8 +62,15 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
6162
*/
6263
public void reBuffer()
6364
{
65+
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
66+
6467
if (isEOF())
68+
{
69+
bufferHolder.release();
70+
bufferHolder = Rebufferer.emptyBufferHolderAt(length());
71+
buffer = bufferHolder.buffer();
6572
return;
73+
}
6674

6775
reBufferAt(current());
6876
}
@@ -83,6 +91,7 @@ private void reBufferAt(long position)
8391
buffer = bufferHolder.buffer();
8492
buffer.position(Ints.checkedCast(position - bufferHolder.offset()));
8593
buffer.order(order);
94+
assert buffer.remaining() > 0: "Buffer must be non empty after rebuffering at " + position;
8695
}
8796
}
8897

src/java/org/apache/cassandra/io/util/RebufferingInputStream.java

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,50 @@ protected RebufferingInputStream(ByteBuffer buffer, boolean validateByteOrder)
5656
this.buffer = buffer;
5757
}
5858

59-
/**
60-
* Implementations must implement this method to refill the buffer.
61-
* They can expect the buffer to be empty when this method is invoked.
62-
* @throws IOException
63-
*/
59+
/// Refills the buffer with new data.
60+
/// The buffer must be empty when this method is invoked.
61+
/// The buffer must be filled with at least 1 byte of data unless EOF is reached.
62+
///
63+
/// EOF is indicated by not writing any bytes to the buffer and leaving the buffer with no remaining content.
64+
/// The implementations must not throw `EOFException` on EOF.
65+
///
66+
/// Callers must not rely on the identity of the buffer object to stay the same after this call returns.
67+
/// The buffer reference may be switched to a different buffer instance in order to provide new data, and the
68+
/// previous buffer may be released if applicable.
69+
/// The buffer reference may be switched to a static empty buffer in case of EOF, in order to release the current
70+
/// exhausted buffer and to free up memory.
71+
/// The buffer is not allowed to be set to null if the call to this method exits normally (no exception thrown).
72+
///
73+
/// @throws IOException when data is expected but could not be read due to an I/O error
74+
/// @throws IllegalStateException if the buffer hasn't been exhausted when this method is invoked
6475
protected abstract void reBuffer() throws IOException;
6576

77+
// This is final because it is a convenience method that simply delegates to readFully(byte[], int, int).
78+
// Override that method instead if you want to change the behavior.
6679
@Override
67-
public void readFully(byte[] b) throws IOException
80+
public final void readFully(byte[] b) throws IOException
6881
{
6982
readFully(b, 0, b.length);
7083
}
7184

7285
@Override
7386
public void readFully(byte[] b, int off, int len) throws IOException
7487
{
75-
int read = read(b, off, len);
76-
if (read < len)
77-
throw new EOFException("EOF after " + read + " bytes out of " + len);
88+
// avoid int overflow
89+
if (off < 0 || off > b.length || len < 0 || len > b.length - off)
90+
throw new IndexOutOfBoundsException();
91+
92+
int copied = 0;
93+
while (copied < len)
94+
{
95+
int read = readInternal(b, off, len - copied);
96+
if (read == -1)
97+
throw new EOFException("EOF after " + copied + " bytes out of " + len);
98+
copied += read;
99+
off += read;
100+
}
101+
102+
assert copied == len;
78103
}
79104

80105
@Override
@@ -84,29 +109,30 @@ public int read(byte[] b, int off, int len) throws IOException
84109
if (off < 0 || off > b.length || len < 0 || len > b.length - off)
85110
throw new IndexOutOfBoundsException();
86111

112+
return readInternal(b, off, len);
113+
}
114+
115+
/// Reads up to `len` bytes into `b` at offset `off` from the current buffer.
116+
/// Returns number of bytes read, or -1 if EOF is reached before reading any bytes.
117+
/// If the buffer is empty, it will be refilled via `reBuffer()` once.
118+
/// If EOF is not reached, reads at least one byte.
119+
private int readInternal(byte[] b, int off, int len) throws IOException
120+
{
87121
if (len == 0)
88122
return 0;
89123

90-
int copied = 0;
91-
while (copied < len)
124+
if (!buffer.hasRemaining())
92125
{
93-
int position = buffer.position();
94-
int remaining = buffer.limit() - position;
95-
if (remaining == 0)
96-
{
97-
reBuffer();
98-
position = buffer.position();
99-
remaining = buffer.limit() - position;
100-
if (remaining == 0)
101-
return copied == 0 ? -1 : copied;
102-
}
103-
int toCopy = min(len - copied, remaining);
104-
FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
105-
buffer.position(position + toCopy);
106-
copied += toCopy;
126+
reBuffer();
127+
if (!buffer.hasRemaining())
128+
return -1; // EOF
107129
}
108130

109-
return copied;
131+
int toRead = min(len, buffer.remaining());
132+
assert toRead > 0 : "toRead must be > 0";
133+
FastByteOperations.copy(buffer, buffer.position(), b, off, toRead);
134+
buffer.position(buffer.position() + toRead);
135+
return toRead;
110136
}
111137

112138
/**
@@ -139,6 +165,8 @@ public void readFully(ByteBuffer dst) throws IOException
139165
buffer.position(position + toCopy);
140166
copied += toCopy;
141167
}
168+
169+
assert copied == len;
142170
}
143171

144172
@DontInline

0 commit comments

Comments
 (0)