Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ public BufferHolder rebuffer(long position)
// chunks.
}
}
assert buf.offset() <= position : "Buffer offset " + buf.offset() + " must be <= requested position " + position;
assert position == source.fileLength() || buf.buffer().limit() >= buf.offset() - position : "Buffer must be non-empty for non-EOF position " + position;
return buf;
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.io.DataInput;
import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;

import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileSegmentInputStream;
import org.apache.cassandra.io.util.Rebufferer;

/**
* Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
Expand Down Expand Up @@ -103,7 +106,10 @@ public long bytesPastMark(DataPosition mark)

public void reBuffer()
{
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
totalChunkOffset += buffer.position();
buffer = chunkProvider.nextChunk();
if (buffer == null)
buffer = Rebufferer.EMPTY.buffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.DoubleSupplier;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;

Expand Down Expand Up @@ -110,8 +111,11 @@ public void position(long position) throws IOException
@Override
protected void reBuffer() throws IOException
{
if (uncompressedChunkPosition < 0)
throw new IllegalStateException("position(long position) wasn't called first");
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
Preconditions.checkState(uncompressedChunkPosition >= 0, "position(long position) wasn't called first");

if (!compressedChunks.hasNext())
return; // EOF, but we cannot signal it with throwing EOFException here because of the contract of reBuffer()

/*
* reBuffer() will only be called if a partition range spanning multiple (adjacent) compressed chunks
Expand All @@ -120,6 +124,8 @@ protected void reBuffer() throws IOException
*/
loadNextChunk();
uncompressedChunkPosition += compressionParams.chunkLength();

assert buffer.hasRemaining() || !compressedChunks.hasNext() : "Buffer should have remaining bytes or be at EOF";
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ public boolean checkCrc() throws IOException
}

@Override
public void readFully(byte[] b) throws IOException
public void readFully(byte[] b, int off, int len) throws IOException
{
checkLimit(b.length);
super.readFully(b);
checkLimit(len);
super.readFully(b, off, len);
}

@Override
Expand All @@ -207,7 +207,7 @@ public int read(byte[] b, int off, int len) throws IOException
@Override
protected void reBuffer()
{
Preconditions.checkState(buffer.remaining() == 0);
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
updateCrc();
bufferOffset += buffer.limit();

Expand All @@ -219,6 +219,7 @@ protected void reBuffer()
protected void readBuffer()
{
buffer.clear();
//noinspection StatementWithEmptyBody
while ((channel.read(buffer, bufferOffset)) == 0) {}
buffer.flip();
}
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/io/util/DataInputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;

/**
* Input stream around a single ByteBuffer.
*/
Expand Down Expand Up @@ -58,7 +60,8 @@ public DataInputBuffer(byte[] buffer)
@Override
protected void reBuffer()
{
//nope, we don't rebuffer, we are done!
Preconditions.checkState(!buffer.hasRemaining(), "reBuffer called with remaining bytes: %s", buffer.remaining());
// nope, we don't rebuffer, we are done!
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;

import com.google.common.base.Preconditions;

public class FileInputStreamPlus extends RebufferingInputStream
{
final FileChannel channel;
Expand Down Expand Up @@ -65,8 +67,10 @@ private FileInputStreamPlus(FileChannel channel, int bufferSize, Path path)
@Override
protected void reBuffer() throws IOException
{
Preconditions.checkState(buffer.remaining() == 0, "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());
buffer.clear();
channel.read(buffer);
//noinspection StatementWithEmptyBody
while (channel.read(buffer) == 0) {}
buffer.flip();
}

Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/io/util/MemoryInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteOrder;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

import org.apache.cassandra.utils.memory.MemoryUtil;
Expand Down Expand Up @@ -51,6 +52,8 @@ public MemoryInputStream(Memory mem, int bufferSize)
@Override
protected void reBuffer() throws IOException
{
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());

if (offset - mem.peer >= mem.size())
return;

Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/io/util/NIODataInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public NIODataInputStream(ReadableByteChannel channel, int bufferSize)
@Override
protected void reBuffer() throws IOException
{
Preconditions.checkState(buffer.remaining() == 0);
buffer.clear();
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());

buffer.clear();
//noinspection StatementWithEmptyBody
while ((channel.read(buffer)) == 0) {}

buffer.flip();
}

Expand Down
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/io/util/RandomAccessReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.LongBuffer;
import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

import org.apache.cassandra.io.compress.BufferType;
Expand Down Expand Up @@ -61,8 +62,15 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
*/
public void reBuffer()
{
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());

if (isEOF())
{
bufferHolder.release();
bufferHolder = Rebufferer.emptyBufferHolderAt(length());
buffer = bufferHolder.buffer();
return;
}

reBufferAt(current());
}
Expand All @@ -83,6 +91,7 @@ private void reBufferAt(long position)
buffer = bufferHolder.buffer();
buffer.position(Ints.checkedCast(position - bufferHolder.offset()));
buffer.order(order);
assert buffer.remaining() > 0: "Buffer must be non empty after rebuffering at " + position;
}
}

Expand Down
80 changes: 54 additions & 26 deletions src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,50 @@ protected RebufferingInputStream(ByteBuffer buffer, boolean validateByteOrder)
this.buffer = buffer;
}

/**
* Implementations must implement this method to refill the buffer.
* They can expect the buffer to be empty when this method is invoked.
* @throws IOException
*/
/// Refills the buffer with new data.
/// The buffer must be empty when this method is invoked.
/// The buffer must be filled with at least 1 byte of data unless EOF is reached.
///
/// EOF is indicated by not writing any bytes to the buffer and leaving the buffer with no remaining content.
/// The implementations must not throw `EOFException` on EOF.
///
/// Callers must not rely on the identity of the buffer object to stay the same after this call returns.
/// The buffer reference may be switched to a different buffer instance in order to provide new data, and the
/// previous buffer may be released if applicable.
/// The buffer reference may be switched to a static empty buffer in case of EOF, in order to release the current
/// exhausted buffer and to free up memory.
/// The buffer is not allowed to be set to null if the call to this method exits normally (no exception thrown).
///
/// @throws IOException when data is expected but could not be read due to an I/O error
/// @throws IllegalStateException if the buffer hasn't been exhausted when this method is invoked
protected abstract void reBuffer() throws IOException;

// This is final because it is a convenience method that simply delegates to readFully(byte[], int, int).
// Override that method instead if you want to change the behavior.
@Override
public void readFully(byte[] b) throws IOException
public final void readFully(byte[] b) throws IOException
{
readFully(b, 0, b.length);
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException
{
int read = read(b, off, len);
if (read < len)
throw new EOFException("EOF after " + read + " bytes out of " + len);
// avoid int overflow
if (off < 0 || off > b.length || len < 0 || len > b.length - off)
throw new IndexOutOfBoundsException();

int copied = 0;
while (copied < len)
{
int read = readInternal(b, off, len - copied);
if (read == -1)
throw new EOFException("EOF after " + copied + " bytes out of " + len);
copied += read;
off += read;
}

assert copied == len;
}

@Override
Expand All @@ -84,29 +109,30 @@ public int read(byte[] b, int off, int len) throws IOException
if (off < 0 || off > b.length || len < 0 || len > b.length - off)
throw new IndexOutOfBoundsException();

return readInternal(b, off, len);
}

/// Reads up to `len` bytes into `b` at offset `off` from the current buffer.
/// Returns number of bytes read, or -1 if EOF is reached before reading any bytes.
/// If the buffer is empty, it will be refilled via `reBuffer()` once.
/// If EOF is not reached, reads at least one byte.
private int readInternal(byte[] b, int off, int len) throws IOException
{
if (len == 0)
return 0;

int copied = 0;
while (copied < len)
if (!buffer.hasRemaining())
{
int position = buffer.position();
int remaining = buffer.limit() - position;
if (remaining == 0)
{
reBuffer();
position = buffer.position();
remaining = buffer.limit() - position;
if (remaining == 0)
return copied == 0 ? -1 : copied;
}
int toCopy = min(len - copied, remaining);
FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
buffer.position(position + toCopy);
copied += toCopy;
reBuffer();
if (!buffer.hasRemaining())
return -1; // EOF
}

return copied;
int toRead = min(len, buffer.remaining());
assert toRead > 0 : "toRead must be > 0";
FastByteOperations.copy(buffer, buffer.position(), b, off, toRead);
buffer.position(buffer.position() + toRead);
return toRead;
}

/**
Expand Down Expand Up @@ -139,6 +165,8 @@ public void readFully(ByteBuffer dst) throws IOException
buffer.position(position + toCopy);
copied += toCopy;
}

assert copied == len;
}

@DontInline
Expand Down
11 changes: 8 additions & 3 deletions src/java/org/apache/cassandra/net/AsyncStreamingInputPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -109,6 +110,11 @@ public boolean append(ByteBuf buf) throws IllegalStateException
@Override
protected void reBuffer() throws EOFException, InputTimeoutException
{
Preconditions.checkState(!isClosed, "Stream already closed");
Preconditions.checkState(buffer != null, "Stream already closed: buffer is null");
Preconditions.checkState(currentBuf != null, "Stream already closed: currentBuf is null");
Preconditions.checkState(!buffer.hasRemaining(), "Current buffer not exhausted, remaining bytes: %s", buffer.remaining());

if (queue.isEmpty())
channel.read();

Expand All @@ -129,9 +135,6 @@ protected void reBuffer() throws EOFException, InputTimeoutException
if (null == next)
throw new InputTimeoutException();

if (next == Unpooled.EMPTY_BUFFER) // Unpooled.EMPTY_BUFFER is the indicator that the input is closed
throw new EOFException();

currentBuf = next;
buffer = next.nioBuffer();
}
Expand All @@ -150,6 +153,8 @@ public void consume(Consumer consumer, long length) throws IOException
{
if (!buffer.hasRemaining())
reBuffer();
if (!buffer.hasRemaining())
throw new EOFException();

final int position = buffer.position();
final int limit = buffer.limit();
Expand Down
Loading