Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 122 streaming operation #125

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
219 changes: 170 additions & 49 deletions MetadataExtractor/IO/IndexedCapturingReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ namespace MetadataExtractor.IO
/// <author>Drew Noakes https://drewnoakes.com</author>
public sealed class IndexedCapturingReader : IndexedReader
{
private const int DefaultChunkLength = 2 * 1024;
private const int DefaultChunkLength = 4 * 1024;

[NotNull]
private readonly Stream _stream;
private readonly int _chunkLength;
private readonly List<byte[]> _chunks = new List<byte[]>();
private bool _isStreamFinished;
private int _streamLength;
private bool _streamLengthThrewException;
private readonly Dictionary<int, byte[]> _chunks;
private int _maxChunkLoaded = -1;
private int _streamLength = -1;
private readonly bool _contiguousBufferMode;

public IndexedCapturingReader([NotNull] Stream stream, int chunkLength = DefaultChunkLength, bool isMotorolaByteOrder = true)
: base(isMotorolaByteOrder)
Expand All @@ -51,6 +51,40 @@ public IndexedCapturingReader([NotNull] Stream stream, int chunkLength = Default

_chunkLength = chunkLength;
_stream = stream ?? throw new ArgumentNullException(nameof(stream));

try
{
// For some reason, FileStreams are faster in contiguous mode. Since this is such a a commont case, we
// specifically check for it.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the underlying OS pre-fetches data. Many read operations are sequential. In fact, when you create a FileStream there's a ctor overload that lets you specify Sequential vs RandomAccess (IIRC) which presumably hints the platform of how the reader will behave. For the places that we create FileStream we could experiment there to see if it matters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so what I didn't understand is why the performance would have changed: in the case where it previously pre-loaded the whole file by reading sequentially (on FileStreams), we still do that. Likewise, the cost of the lookup was all but eliminated by memoizing the last looked up chunk (only something like 5% of calls ended up requesting a chunk other than the last one it returned). But the new code is still ~50% slower at running the test-suite.

Alas I am developing on a Mac, and the Xamarin-based toolchain here doesn't have a robust profiler (nor do I have VS Ultimate (or whatever it's called) on my Windows machine). If you do have access to a profiler, I'd be very interested to know what you see.

if (_stream is FileStream)
{
_contiguousBufferMode = true;
}
else
{
// If the stream is both seekable and has a length, switch to non-contiguous buffering mode. This
// will use Seek operations to access data that is far beyond the reach of what has been buffered,
// rather than reading the entire file into memory in this case.
_contiguousBufferMode = !(_stream.Length > 0 && _stream.CanSeek);
}
}
catch (NotSupportedException)
{
// Streams that don't support the Length property have to be handled in contiguous mode.
_contiguousBufferMode = true;
}

if (!_contiguousBufferMode)
{
// If we know the length of the stream ahead of time, we can allocate a Dictionary with enough slots
// for all the chunks. We 2X it to try to avoid hash collisions.
var chunksCapacity = 2 * (_stream.Length / chunkLength);
_chunks = new Dictionary<int, byte[]>((int) chunksCapacity);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most data streams we would be unlikely to load all the chunks in the file, so perhaps preallocating enough buckets for the entire file is unnecessary and may even hurt performance by spreading entries across more cache lines than are strictly needed. We'd need to measure performance to be sure one way or another.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only added this after doing a series of performance tests to ensure it would be a win. It's relatively small (~5%) but it was measurable and repeatable.

}
else
{
_chunks = new Dictionary<int, byte[]>();
}
}

/// <summary>
Expand All @@ -68,21 +102,16 @@ public override long Length
{
get
{
if (!_streamLengthThrewException)
if (_contiguousBufferMode)
{
try
if (_streamLength != -1)
{
return _stream.Length;
}
catch (NotSupportedException)
{
_streamLengthThrewException = true;
IsValidIndex(int.MaxValue, 1);
}
return _streamLength;
}

IsValidIndex(int.MaxValue, 1);
Debug.Assert(_isStreamFinished);
return _streamLength;
return _stream.Length;
}
}

Expand All @@ -103,12 +132,126 @@ protected override void ValidateIndex(int index, int bytesRequested)
if ((long)index + bytesRequested - 1 > int.MaxValue)
throw new BufferBoundsException($"Number of requested bytes summed with starting index exceed maximum range of signed 32 bit integers (requested index: {index}, requested count: {bytesRequested})");

Debug.Assert(_isStreamFinished);
// TODO test that can continue using an instance of this type after this exception
throw new BufferBoundsException(ToUnshiftedOffset(index), bytesRequested, _streamLength);
}
}

/// <summary>
/// Helper method for GetChunk. This will load the next chunk of data from the input stream. If non contiguous
/// buffering mode is being used, this method relies on the called (GetChunk) to set the stream's position
/// correctly. In contiguous buffer mode, this will simply be the next chunk in sequence (the stream's Position
/// field will just advance monotonically).
/// </summary>
/// <returns></returns>
private byte[] LoadNextChunk()
{
var chunk = new byte[_chunkLength];
var totalBytesRead = 0;

while (totalBytesRead != _chunkLength)
{
var bytesRead = _stream.Read(chunk, totalBytesRead, _chunkLength - totalBytesRead);
totalBytesRead += bytesRead;

// if no bytes were read at all, we've reached the end of the file.
if (bytesRead == 0 && totalBytesRead == 0)
{
return null;
}

// If this read didn't produce any bytes, but a previous read did, we've hit the end of the file, so
// shrink the chunk down to the number of bytes we actually have.
if (bytesRead == 0)
{
var shrunkChunk = new byte[totalBytesRead];
Buffer.BlockCopy(chunk, 0, shrunkChunk, 0, totalBytesRead);
return shrunkChunk;
}
}

return chunk;
}

// GetChunk is substantially slower for random accesses owing to needing to use a Dictionary, rather than a
// List. However, the typical access pattern isn't very random at all -- you generally read a whole series of
// bytes from the same chunk. So we just cache the last chunk that was read and return that directly if it's
// requested again. This is about 15% faster than going straight to the Dictionary.
private int _lastChunkIdx = -1;
private byte[] _lastChunkData = null;

/// <summary>
/// Load the data for the given chunk (if necessary), and return it. Chunks are identified by their index,
/// which is their start offset divided by the chunk length. eg: offset 10 will typically refer to chunk
/// index 0. See DoGetChunk() for implementation -- this function adds simple memoization.
/// </summary>
/// <param name="chunkIndex">The index of the chunk to get</param>
private byte[] GetChunk(int chunkIndex)
{
if (chunkIndex == _lastChunkIdx)
{
return _lastChunkData;
}

var result = DoGetChunk(chunkIndex);
_lastChunkIdx = chunkIndex;
_lastChunkData = result;

return result;
}

private byte[] DoGetChunk(int chunkIndex)
{
byte[] result;
if (_chunks.TryGetValue(chunkIndex, out result))
{
return result;
}

if (!_contiguousBufferMode)
{
var chunkStart = chunkIndex * _chunkLength;

// Often we will be reading long contiguous blocks, even in non-contiguous mode. Don't issue Seeks in
// that case, so as to avoid unnecessary syscalls.
if (chunkStart != _stream.Position)
{
_stream.Seek(chunkStart, SeekOrigin.Begin);
}

var nextChunk = LoadNextChunk();
if (nextChunk != null)
{
_chunks[chunkIndex] = nextChunk;
var newStreamLen = (chunkIndex * _chunkLength) + nextChunk.Length;
_streamLength = newStreamLen > _streamLength ? newStreamLen : _streamLength;
}

return nextChunk;
}

byte[] curChunk = null;
while (_maxChunkLoaded < chunkIndex)
{
var curChunkIdx = _maxChunkLoaded + 1;
curChunk = LoadNextChunk();
if (curChunk != null)
{
_chunks[curChunkIdx] = curChunk;
var newStreamLen = (curChunkIdx * _chunkLength) + curChunk.Length;
_streamLength = newStreamLen > _streamLength ? newStreamLen : _streamLength;
}
else
{
return null;
}

_maxChunkLoaded = curChunkIdx;
}

return curChunk;
}

protected override bool IsValidIndex(int index, int bytesRequested)
{
if (index < 0 || bytesRequested < 0)
Expand All @@ -118,44 +261,22 @@ protected override bool IsValidIndex(int index, int bytesRequested)
if (endIndexLong > int.MaxValue)
return false;

if (!_contiguousBufferMode)
{
return endIndexLong < _stream.Length;
}

var endIndex = (int)endIndexLong;
if (_isStreamFinished)
return endIndex < _streamLength;

var chunkIndex = endIndex / _chunkLength;

while (chunkIndex >= _chunks.Count)
var endChunk = GetChunk(chunkIndex);
if (endChunk == null)
{
Debug.Assert(!_isStreamFinished);

var chunk = new byte[_chunkLength];
var totalBytesRead = 0;
while (!_isStreamFinished && totalBytesRead != _chunkLength)
{
var bytesRead = _stream.Read(chunk, totalBytesRead, _chunkLength - totalBytesRead);

if (bytesRead == 0)
{
// the stream has ended, which may be ok
_isStreamFinished = true;
_streamLength = _chunks.Count * _chunkLength + totalBytesRead;
// check we have enough bytes for the requested index
if (endIndex >= _streamLength)
{
_chunks.Add(chunk);
return false;
}
}
else
{
totalBytesRead += bytesRead;
}
}

_chunks.Add(chunk);
return false;
}

return true;
return endChunk.Length > (endIndex % _chunkLength);
}

public override int ToUnshiftedOffset(int localOffset) => localOffset;
Expand All @@ -166,7 +287,7 @@ public override byte GetByte(int index)

var chunkIndex = index / _chunkLength;
var innerIndex = index % _chunkLength;
var chunk = _chunks[chunkIndex];
var chunk = GetChunk(chunkIndex);
return chunk[innerIndex];
}

Expand All @@ -183,7 +304,7 @@ public override byte[] GetBytes(int index, int count)
var fromChunkIndex = fromIndex / _chunkLength;
var fromInnerIndex = fromIndex % _chunkLength;
var length = Math.Min(remaining, _chunkLength - fromInnerIndex);
var chunk = _chunks[fromChunkIndex];
var chunk = GetChunk(fromChunkIndex);
Array.Copy(chunk, fromInnerIndex, bytes, toIndex, length);
remaining -= length;
fromIndex += length;
Expand Down