From 14723a914443925deb02d5696a7cc4a23ce46dd2 Mon Sep 17 00:00:00 2001 From: Mukul Sabharwal Date: Sat, 5 Sep 2020 16:29:02 -0700 Subject: [PATCH 1/2] Add Nettrace compression support --- src/TraceEvent/Compression/ULZCompression.cs | 356 ++++++++++++++++++ src/TraceEvent/EventPipe/EventCache.cs | 20 +- .../EventPipe/EventPipeEventSource.cs | 3 +- 3 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 src/TraceEvent/Compression/ULZCompression.cs diff --git a/src/TraceEvent/Compression/ULZCompression.cs b/src/TraceEvent/Compression/ULZCompression.cs new file mode 100644 index 000000000..1680def86 --- /dev/null +++ b/src/TraceEvent/Compression/ULZCompression.cs @@ -0,0 +1,356 @@ +using System; +using System.Runtime.CompilerServices; + +namespace Microsoft.Diagnostics.Tracing +{ + internal static class ULZCompression + { + private const int WindowBits = 17; + private const int WindowSize = 1 << WindowBits; + private const int WindowMask = WindowSize - 1; + private const int MinMatch = 4; + private const int HashBits = 19; + private const int HashSize = 1 << HashBits; + private const int Nil = -1; + + public static unsafe int Compress(ArraySegment input, int level) + { + var hashTable = new int[HashSize]; + var prev = new int[WindowSize]; + var output = new byte[input.Count]; + + fixed (int* hashTablePtr = &hashTable[0]) + { + fixed (int* prevPtr = &prev[0]) + { + fixed (byte* inputPtr = &input.Array[input.Offset]) + { + fixed (byte* outputPtr = &output[0]) + { + return Compress(hashTablePtr, prevPtr, inputPtr, input.Count, outputPtr, level); + } + } + } + } + } + + public static unsafe ArraySegment Decompress(ArraySegment input, int decompressedSize) + { + byte[] output = new byte[decompressedSize * 2]; + fixed (byte* inputPtr = &input.Array[input.Offset]) + { + fixed (byte* outputPtr = &output[0]) + { + int actualDecompressedSize = Decompress(inputPtr, input.Count, outputPtr, output.Length); + if (decompressedSize != actualDecompressedSize) + { + throw new Exception($"Unexpected decompressed size. Expected: {decompressedSize}, Actual: {actualDecompressedSize}"); + } + + return new ArraySegment(output, 0, decompressedSize); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe uint Hash32(byte* p) + { + return (Unsafe.ReadUnaligned(p) * 0x9E3779B9) >> (32 - HashBits); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe void WildCopy(byte* d, byte* s, int n) + { + Unsafe.WriteUnaligned(d, Unsafe.ReadUnaligned(s)); + + for (int i = 8; i < n; i += 8) + { + Unsafe.WriteUnaligned(d + i, Unsafe.ReadUnaligned(s + i)); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe void EncodeMod(ref byte* p, uint x) + { + while (x >= 128) + { + x -= 128; + *p++ = (byte)(128 + (x & 127)); + x >>= 7; + } + + *p++ = (byte)x; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe uint DecodeMod(ref byte* p) + { + uint x = 0; + + for (int i = 0; i <= 21; i += 7) + { + uint c = *p++; + x += c << i; + if (c < 128) + { + break; + } + } + + return x; + } + + private static unsafe int Decompress(byte* input, int inputLength, byte* output, int outputLength) + { + byte* op = output; + byte* ip = input; + byte* ipEnd = ip + inputLength; + byte* opEnd = op + outputLength; + + while (ip < ipEnd) + { + int token = *ip++; + + if (token >= 32) + { + int run = token >> 5; + + if (run == 7) + { + run += (int)DecodeMod(ref ip); + } + + if ((opEnd - op) < run || (ipEnd - ip) < run) // Overrun check + { + return -1; + } + + WildCopy(op, ip, run); + + op += run; + ip += run; + + if (ip >= ipEnd) + { + break; + } + } + + int len = (token & 15) + MinMatch; + + if (len == 15 + MinMatch) + { + len += (int)DecodeMod(ref ip); + } + + if (opEnd - op < len) // Overrun check + { + return -1; + } + + int dist = ((token & 16) << 12) + Unsafe.ReadUnaligned(ip); + ip += 2; + byte* cp = op - dist; + if (op - output < dist) + { + return -1; + } + + if (dist >= 8) + { + WildCopy(op, cp, len); + op += len; + } + else + { + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + + while (len-- != 4) + { + *op++ = *cp++; + } + } + } + + return ip == ipEnd ? (int)(op - output) : -1; + } + + private static unsafe int Compress(int* hashTable, int* prev, byte* input, int inputLength, byte* output, int level) + { + if (level < 1 || level > 9) + { + return -1; + } + + int maxChain = level < 9 ? 1 << level : 1 << 13; + + for (int i = 0; i < HashSize; ++i) + { + hashTable[i] = Nil; + } + + byte* op = output; + int anchor = 0; + + int p = 0; + while (p < inputLength) + { + int bestLen = 0; + int dist = 0; + + int maxMatch = inputLength - p; + if (maxMatch >= MinMatch) + { + int limit = Math.Max(p - WindowSize, Nil); + int chainLen = maxChain; + + int s = hashTable[Hash32(&input[p])]; + while (s > limit) + { + if (input[s + bestLen] == input[p + bestLen] && Unsafe.ReadUnaligned(&input[s]) == Unsafe.ReadUnaligned(&input[p])) + { + int len = MinMatch; + while (len < maxMatch && input[s + len] == input[p + len]) + { + ++len; + } + + if (len > bestLen) + { + bestLen = len; + dist = p - s; + + if (len == maxMatch) + { + break; + } + } + } + + if (--chainLen == 0) + { + break; + } + + s = prev[s & WindowMask]; + } + } + + if (bestLen == MinMatch && (p - anchor) >= (7 + 128)) + { + bestLen = 0; + } + + if (level >= 5 && bestLen >= MinMatch && bestLen < maxMatch && (p - anchor) != 6) + { + int x = p + 1; + int targetLen = bestLen + 1; + + int limit = Math.Max(x - WindowSize, Nil); + int chainLen = maxChain; + + int s = hashTable[Hash32(&input[x])]; + while (s > limit) + { + if (input[s + bestLen] == input[x + bestLen] && Unsafe.ReadUnaligned(&input[s]) == Unsafe.ReadUnaligned(&input[x])) + { + int len = MinMatch; + + while (len < targetLen && input[s + len] == input[x + len]) + { + ++len; + } + + if (len == targetLen) + { + bestLen = 0; + break; + } + } + + if (--chainLen == 0) + { + break; + } + + s = prev[s & WindowMask]; + } + } + + if (bestLen >= MinMatch) + { + int len = bestLen - MinMatch; + int token = ((dist >> 12) & 16) + Math.Min(len, 15); + + if (anchor != p) + { + int run = p - anchor; + + if (run >= 7) + { + *op++ = (byte)((7 << 5) + token); + EncodeMod(ref op, (uint)(run - 7)); + } + else + { + *op++ = (byte)((run << 5) + token); + } + + WildCopy(op, &input[anchor], run); + op += run; + } + else + { + *op++ = (byte)token; + } + + if (len >= 15) + { + EncodeMod(ref op, (uint)(len - 15)); + } + + Unsafe.WriteUnaligned(op, (ushort)dist); + op += 2; + + while (bestLen-- != 0) + { + uint h = Hash32(&input[p]); + prev[p & WindowMask] = hashTable[h]; + hashTable[h] = p++; + } + + anchor = p; + } + else + { + uint h = Hash32(&input[p]); + prev[p & WindowMask] = hashTable[h]; + hashTable[h] = p++; + } + } + + if (anchor != p) + { + int run = p - anchor; + + if (run >= 7) + { + *op++ = 7 << 5; + EncodeMod(ref op, (uint)(run - 7)); + } + else + { + *op++ = (byte)(run << 5); + } + + WildCopy(op, &input[anchor], run); + op += run; + } + + return (int)(op - output); + } + } +} diff --git a/src/TraceEvent/EventPipe/EventCache.cs b/src/TraceEvent/EventPipe/EventCache.cs index 4ce5d80cf..ea737c47d 100644 --- a/src/TraceEvent/EventPipe/EventCache.cs +++ b/src/TraceEvent/EventPipe/EventCache.cs @@ -31,12 +31,28 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) } ushort flags = BitConverter.ToUInt16(eventBlockData, 2); bool useHeaderCompression = (flags & (ushort)EventBlockFlags.HeaderCompression) != 0; + bool isULZCompressed = (flags & (ushort)EventBlockFlags.EventBlockULZCompression) != 0; + + int eventBlockSize = eventBlockData.Length; + if (isULZCompressed) + { + int decompressedSize = (int)BitConverter.ToUInt32(eventBlockData, 20); // Decompressed Size is at offset 20 + var retVal = ULZCompression.Decompress(new ArraySegment(eventBlockData, headerSize, eventBlockData.Length - headerSize), decompressedSize); + eventBlockData = retVal.Array; + eventBlockSize = decompressedSize; + } // parse the events PinnedBuffer buffer = new PinnedBuffer(eventBlockData); byte* cursor = (byte*)buffer.PinningHandle.AddrOfPinnedObject(); - byte* end = cursor + eventBlockData.Length; - cursor += headerSize; + byte* end = cursor + eventBlockSize; + + // if we did compression the byte array was replaced and therefore the cursor doesn't need to be advanced + if (!isULZCompressed) + { + cursor += headerSize; + } + EventMarker eventMarker = new EventMarker(buffer); long timestamp = 0; EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header); diff --git a/src/TraceEvent/EventPipe/EventPipeEventSource.cs b/src/TraceEvent/EventPipe/EventPipeEventSource.cs index 56cf22ab6..c0ec618cf 100644 --- a/src/TraceEvent/EventPipe/EventPipeEventSource.cs +++ b/src/TraceEvent/EventPipe/EventPipeEventSource.cs @@ -887,7 +887,8 @@ public unsafe void FromStream(Deserializer deserializer) internal enum EventBlockFlags : short { Uncompressed = 0, - HeaderCompression = 1 + HeaderCompression = 1, + EventBlockULZCompression = 2 } /// From cce19e7f9882685d6b996af92f1a921995b55169 Mon Sep 17 00:00:00 2001 From: Mukul Sabharwal Date: Thu, 10 Sep 2020 00:24:47 -0700 Subject: [PATCH 2/2] Add multi process support as well --- src/TraceEvent/Compression/ULZCompression.cs | 356 ------------------ src/TraceEvent/EventPipe/EventCache.cs | 11 +- .../EventPipe/EventPipeEventSource.cs | 156 +++++++- src/TraceEvent/EventPipe/ULZCompression.cs | 137 +++++++ 4 files changed, 285 insertions(+), 375 deletions(-) delete mode 100644 src/TraceEvent/Compression/ULZCompression.cs create mode 100644 src/TraceEvent/EventPipe/ULZCompression.cs diff --git a/src/TraceEvent/Compression/ULZCompression.cs b/src/TraceEvent/Compression/ULZCompression.cs deleted file mode 100644 index 1680def86..000000000 --- a/src/TraceEvent/Compression/ULZCompression.cs +++ /dev/null @@ -1,356 +0,0 @@ -using System; -using System.Runtime.CompilerServices; - -namespace Microsoft.Diagnostics.Tracing -{ - internal static class ULZCompression - { - private const int WindowBits = 17; - private const int WindowSize = 1 << WindowBits; - private const int WindowMask = WindowSize - 1; - private const int MinMatch = 4; - private const int HashBits = 19; - private const int HashSize = 1 << HashBits; - private const int Nil = -1; - - public static unsafe int Compress(ArraySegment input, int level) - { - var hashTable = new int[HashSize]; - var prev = new int[WindowSize]; - var output = new byte[input.Count]; - - fixed (int* hashTablePtr = &hashTable[0]) - { - fixed (int* prevPtr = &prev[0]) - { - fixed (byte* inputPtr = &input.Array[input.Offset]) - { - fixed (byte* outputPtr = &output[0]) - { - return Compress(hashTablePtr, prevPtr, inputPtr, input.Count, outputPtr, level); - } - } - } - } - } - - public static unsafe ArraySegment Decompress(ArraySegment input, int decompressedSize) - { - byte[] output = new byte[decompressedSize * 2]; - fixed (byte* inputPtr = &input.Array[input.Offset]) - { - fixed (byte* outputPtr = &output[0]) - { - int actualDecompressedSize = Decompress(inputPtr, input.Count, outputPtr, output.Length); - if (decompressedSize != actualDecompressedSize) - { - throw new Exception($"Unexpected decompressed size. Expected: {decompressedSize}, Actual: {actualDecompressedSize}"); - } - - return new ArraySegment(output, 0, decompressedSize); - } - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static unsafe uint Hash32(byte* p) - { - return (Unsafe.ReadUnaligned(p) * 0x9E3779B9) >> (32 - HashBits); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static unsafe void WildCopy(byte* d, byte* s, int n) - { - Unsafe.WriteUnaligned(d, Unsafe.ReadUnaligned(s)); - - for (int i = 8; i < n; i += 8) - { - Unsafe.WriteUnaligned(d + i, Unsafe.ReadUnaligned(s + i)); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static unsafe void EncodeMod(ref byte* p, uint x) - { - while (x >= 128) - { - x -= 128; - *p++ = (byte)(128 + (x & 127)); - x >>= 7; - } - - *p++ = (byte)x; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static unsafe uint DecodeMod(ref byte* p) - { - uint x = 0; - - for (int i = 0; i <= 21; i += 7) - { - uint c = *p++; - x += c << i; - if (c < 128) - { - break; - } - } - - return x; - } - - private static unsafe int Decompress(byte* input, int inputLength, byte* output, int outputLength) - { - byte* op = output; - byte* ip = input; - byte* ipEnd = ip + inputLength; - byte* opEnd = op + outputLength; - - while (ip < ipEnd) - { - int token = *ip++; - - if (token >= 32) - { - int run = token >> 5; - - if (run == 7) - { - run += (int)DecodeMod(ref ip); - } - - if ((opEnd - op) < run || (ipEnd - ip) < run) // Overrun check - { - return -1; - } - - WildCopy(op, ip, run); - - op += run; - ip += run; - - if (ip >= ipEnd) - { - break; - } - } - - int len = (token & 15) + MinMatch; - - if (len == 15 + MinMatch) - { - len += (int)DecodeMod(ref ip); - } - - if (opEnd - op < len) // Overrun check - { - return -1; - } - - int dist = ((token & 16) << 12) + Unsafe.ReadUnaligned(ip); - ip += 2; - byte* cp = op - dist; - if (op - output < dist) - { - return -1; - } - - if (dist >= 8) - { - WildCopy(op, cp, len); - op += len; - } - else - { - *op++ = *cp++; - *op++ = *cp++; - *op++ = *cp++; - *op++ = *cp++; - - while (len-- != 4) - { - *op++ = *cp++; - } - } - } - - return ip == ipEnd ? (int)(op - output) : -1; - } - - private static unsafe int Compress(int* hashTable, int* prev, byte* input, int inputLength, byte* output, int level) - { - if (level < 1 || level > 9) - { - return -1; - } - - int maxChain = level < 9 ? 1 << level : 1 << 13; - - for (int i = 0; i < HashSize; ++i) - { - hashTable[i] = Nil; - } - - byte* op = output; - int anchor = 0; - - int p = 0; - while (p < inputLength) - { - int bestLen = 0; - int dist = 0; - - int maxMatch = inputLength - p; - if (maxMatch >= MinMatch) - { - int limit = Math.Max(p - WindowSize, Nil); - int chainLen = maxChain; - - int s = hashTable[Hash32(&input[p])]; - while (s > limit) - { - if (input[s + bestLen] == input[p + bestLen] && Unsafe.ReadUnaligned(&input[s]) == Unsafe.ReadUnaligned(&input[p])) - { - int len = MinMatch; - while (len < maxMatch && input[s + len] == input[p + len]) - { - ++len; - } - - if (len > bestLen) - { - bestLen = len; - dist = p - s; - - if (len == maxMatch) - { - break; - } - } - } - - if (--chainLen == 0) - { - break; - } - - s = prev[s & WindowMask]; - } - } - - if (bestLen == MinMatch && (p - anchor) >= (7 + 128)) - { - bestLen = 0; - } - - if (level >= 5 && bestLen >= MinMatch && bestLen < maxMatch && (p - anchor) != 6) - { - int x = p + 1; - int targetLen = bestLen + 1; - - int limit = Math.Max(x - WindowSize, Nil); - int chainLen = maxChain; - - int s = hashTable[Hash32(&input[x])]; - while (s > limit) - { - if (input[s + bestLen] == input[x + bestLen] && Unsafe.ReadUnaligned(&input[s]) == Unsafe.ReadUnaligned(&input[x])) - { - int len = MinMatch; - - while (len < targetLen && input[s + len] == input[x + len]) - { - ++len; - } - - if (len == targetLen) - { - bestLen = 0; - break; - } - } - - if (--chainLen == 0) - { - break; - } - - s = prev[s & WindowMask]; - } - } - - if (bestLen >= MinMatch) - { - int len = bestLen - MinMatch; - int token = ((dist >> 12) & 16) + Math.Min(len, 15); - - if (anchor != p) - { - int run = p - anchor; - - if (run >= 7) - { - *op++ = (byte)((7 << 5) + token); - EncodeMod(ref op, (uint)(run - 7)); - } - else - { - *op++ = (byte)((run << 5) + token); - } - - WildCopy(op, &input[anchor], run); - op += run; - } - else - { - *op++ = (byte)token; - } - - if (len >= 15) - { - EncodeMod(ref op, (uint)(len - 15)); - } - - Unsafe.WriteUnaligned(op, (ushort)dist); - op += 2; - - while (bestLen-- != 0) - { - uint h = Hash32(&input[p]); - prev[p & WindowMask] = hashTable[h]; - hashTable[h] = p++; - } - - anchor = p; - } - else - { - uint h = Hash32(&input[p]); - prev[p & WindowMask] = hashTable[h]; - hashTable[h] = p++; - } - } - - if (anchor != p) - { - int run = p - anchor; - - if (run >= 7) - { - *op++ = 7 << 5; - EncodeMod(ref op, (uint)(run - 7)); - } - else - { - *op++ = (byte)(run << 5); - } - - WildCopy(op, &input[anchor], run); - op += run; - } - - return (int)(op - output); - } - } -} diff --git a/src/TraceEvent/EventPipe/EventCache.cs b/src/TraceEvent/EventPipe/EventCache.cs index ea737c47d..d5766ccb3 100644 --- a/src/TraceEvent/EventPipe/EventCache.cs +++ b/src/TraceEvent/EventPipe/EventCache.cs @@ -15,7 +15,7 @@ internal class EventCache public event ParseBufferItemFunction OnEvent; public event Action OnEventsDropped; - public unsafe void ProcessEventBlock(byte[] eventBlockData) + public unsafe void ProcessEventBlock(int version, byte[] eventBlockData) { // parse the header if(eventBlockData.Length < 20) @@ -34,10 +34,10 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) bool isULZCompressed = (flags & (ushort)EventBlockFlags.EventBlockULZCompression) != 0; int eventBlockSize = eventBlockData.Length; - if (isULZCompressed) + if (isULZCompressed && headerSize >= 24) { int decompressedSize = (int)BitConverter.ToUInt32(eventBlockData, 20); // Decompressed Size is at offset 20 - var retVal = ULZCompression.Decompress(new ArraySegment(eventBlockData, headerSize, eventBlockData.Length - headerSize), decompressedSize); + ArraySegment retVal = ULZCompression.Decompress(new ArraySegment(eventBlockData, headerSize, eventBlockData.Length - headerSize), decompressedSize); eventBlockData = retVal.Array; eventBlockSize = decompressedSize; } @@ -55,7 +55,8 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) EventMarker eventMarker = new EventMarker(buffer); long timestamp = 0; - EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header); + + EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header); if (!_threads.TryGetValue(eventMarker.Header.CaptureThreadId, out EventCacheThread thread)) { thread = new EventCacheThread(); @@ -65,7 +66,7 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) eventMarker = new EventMarker(buffer); while (cursor < end) { - EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header); + EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header); bool isSortedEvent = eventMarker.Header.IsSorted; timestamp = eventMarker.Header.TimeStamp; int sequenceNumber = eventMarker.Header.SequenceNumber; diff --git a/src/TraceEvent/EventPipe/EventPipeEventSource.cs b/src/TraceEvent/EventPipe/EventPipeEventSource.cs index c0ec618cf..dc4f0a5b2 100644 --- a/src/TraceEvent/EventPipe/EventPipeEventSource.cs +++ b/src/TraceEvent/EventPipe/EventPipeEventSource.cs @@ -218,10 +218,10 @@ void ReadEventHeader(byte* headerPtr, bool useHeaderCompression, ref EventPipeEv { EventPipeEventHeader.ReadFromFormatV3(headerPtr, ref eventData); } - else // if (FileFormatVersionNumber == 4) + else { - EventPipeEventHeader.ReadFromFormatV4(headerPtr, useHeaderCompression, ref eventData); - if(eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes)) + EventPipeEventHeader.ReadFromFormat(FileFormatVersionNumber, headerPtr, useHeaderCompression, ref eventData); + if (eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes)) { eventData.StackBytesSize = stackBytesSize; eventData.StackBytes = stackBytes; @@ -903,12 +903,13 @@ public EventPipeEventBlock(EventPipeEventSource source) : base(source) { } protected unsafe override void ReadBlockContents(PinnedStreamReader reader) { - if(_source.FileFormatVersionNumber >= 4) + int version = _source.FileFormatVersionNumber; + if (version >= 4) { _source.ResetCompressedHeader(); byte[] eventBlockBytes = new byte[_endEventData.Sub(_startEventData)]; reader.Read(eventBlockBytes, 0, eventBlockBytes.Length); - _source.EventCache.ProcessEventBlock(eventBlockBytes); + _source.EventCache.ProcessEventBlock(version, eventBlockBytes); } else { @@ -1098,6 +1099,7 @@ public EventPipeEventMetaDataHeader(PinnedStreamReader reader, int length, Event // // Note: ThreadId isn't 32 bit on all of our platforms but ETW EVENT_RECORD* only has room for a 32 bit // ID. We'll need to refactor up the stack if we want to expose a bigger ID. + _eventRecord->EventHeader.ProcessId = eventData.ProcessID; _eventRecord->EventHeader.ThreadId = unchecked((int)eventData.ThreadId); if (eventData.ThreadId == eventData.CaptureThreadId && eventData.CaptureProcNumber != -1) { @@ -1364,7 +1366,25 @@ public static void ReadFromFormatV3(byte* headerPtr, ref EventPipeEventHeader he } [StructLayout(LayoutKind.Sequential, Pack = 1)] - struct LayoutV4 + struct LayoutV6 + { + public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself. + public int MetaDataId; // a number identifying the description of this event. + public int SequenceNumber; + public long ThreadId; + public long CaptureThreadId; + public int CaptureProcNumber; + public int StackId; + public long TimeStamp; + public Guid ActivityID; + public Guid RelatedActivityID; + public int ProcessID; // process id in a multi-process trace + public int PayloadSize; // size in bytes of the user defined payload data. + public fixed byte Payload[4]; // Actually of variable size. 4 is used to avoid potential alignment issues. This 4 also appears in HeaderSize below. + } + + [StructLayout(LayoutKind.Sequential, Pack = 1)] + struct LayoutV5 { public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself. public int MetaDataId; // a number identifying the description of this event. @@ -1389,7 +1409,8 @@ enum CompressedHeaderFlags ActivityId = 1 << 4, RelatedActivityId = 1 << 5, Sorted = 1 << 6, - DataLength = 1 << 7 + DataLength = 1 << 7, + ProcessId = 1 << 8, } static uint ReadVarUInt32(ref byte* pCursor) @@ -1432,11 +1453,108 @@ static ulong ReadVarUInt64(ref byte* pCursor) return val; } - public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + public static void ReadFromFormat(int version, byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + { + switch (version) + { + case 4: + case 5: + ReadFromFormatV5(headerPtr, useHeaderCompresion, ref header); + break; + case 6: + ReadFromFormatV6(headerPtr, useHeaderCompresion, ref header); + break; + default: + throw new Exception("Unexpected version"); + } + } + + private static void ReadFromFormatV6(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + { + if (!useHeaderCompresion) + { + LayoutV6* pLayout = (LayoutV6*)headerPtr; + header.EventSize = pLayout->EventSize; + header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF; + header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0; + header.SequenceNumber = pLayout->SequenceNumber; + header.ThreadId = pLayout->ThreadId; + header.CaptureThreadId = pLayout->CaptureThreadId; + header.CaptureProcNumber = pLayout->CaptureProcNumber; + header.StackId = pLayout->StackId; + header.TimeStamp = pLayout->TimeStamp; + header.ActivityID = pLayout->ActivityID; + header.RelatedActivityID = pLayout->RelatedActivityID; + header.PayloadSize = pLayout->PayloadSize; + header.ProcessID = pLayout->ProcessID; + header.Payload = (IntPtr)pLayout->Payload; + header.HeaderSize = (sizeof(LayoutV6) - 4); + int totalSize = header.EventSize + 4; + header.TotalNonHeaderSize = totalSize - header.HeaderSize; + } + else + { + byte* headerStart = headerPtr; + byte flags = *headerPtr; + headerPtr++; + if ((flags & (byte)CompressedHeaderFlags.MetadataId) != 0) + { + header.MetaDataId = (int)ReadVarUInt32(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.CaptureThreadAndSequence) != 0) + { + header.SequenceNumber += (int)ReadVarUInt32(ref headerPtr) + 1; + header.CaptureThreadId = (long)ReadVarUInt64(ref headerPtr); + header.CaptureProcNumber = (int)ReadVarUInt32(ref headerPtr); + } + else + { + if (header.MetaDataId != 0) + { + header.SequenceNumber++; + } + } + if ((flags & (byte)CompressedHeaderFlags.ThreadId) != 0) + { + header.ThreadId = (int)ReadVarUInt64(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.StackId) != 0) + { + header.StackId = (int)ReadVarUInt32(ref headerPtr); + } + ulong timestampDelta = ReadVarUInt64(ref headerPtr); + header.TimeStamp += (long)timestampDelta; + if ((flags & (byte)CompressedHeaderFlags.ActivityId) != 0) + { + header.ActivityID = *(Guid*)headerPtr; + headerPtr += sizeof(Guid); + } + if ((flags & (byte)CompressedHeaderFlags.RelatedActivityId) != 0) + { + header.RelatedActivityID = *(Guid*)headerPtr; + headerPtr += sizeof(Guid); + } + header.IsSorted = (flags & (byte)CompressedHeaderFlags.Sorted) != 0; + if ((flags & (ushort)CompressedHeaderFlags.ProcessId) != 0) + { + header.ProcessID = (int)ReadVarUInt32(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.DataLength) != 0) + { + header.PayloadSize = (int)ReadVarUInt32(ref headerPtr); + } + header.Payload = (IntPtr)headerPtr; + + header.HeaderSize = (int)(headerPtr - headerStart); + header.TotalNonHeaderSize = header.PayloadSize; + } + } + + private static void ReadFromFormatV5(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) { if (!useHeaderCompresion) { - LayoutV4* pLayout = (LayoutV4*)headerPtr; + LayoutV5* pLayout = (LayoutV5*)headerPtr; header.EventSize = pLayout->EventSize; header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF; header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0; @@ -1450,7 +1568,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r header.RelatedActivityID = pLayout->RelatedActivityID; header.PayloadSize = pLayout->PayloadSize; header.Payload = (IntPtr)pLayout->Payload; - header.HeaderSize = (sizeof(LayoutV4) - 4); + header.HeaderSize = (sizeof(LayoutV5) - 4); int totalSize = header.EventSize + 4; header.TotalNonHeaderSize = totalSize - header.HeaderSize; } @@ -1518,6 +1636,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r public Guid ActivityID; public Guid RelatedActivityID; public bool IsSorted; + public int ProcessID; // process id in multi-process trace public int PayloadSize; // size in bytes of the user defined payload data. public IntPtr Payload; public int StackId; @@ -1539,9 +1658,14 @@ public static int GetTotalEventSize(byte* headerPtr, int formatVersion) LayoutV3* header = (LayoutV3*)headerPtr; return header->EventSize + sizeof(int); } - else //if(formatVersion == 4) + else if(formatVersion <= 5) + { + LayoutV5* header = (LayoutV5*)headerPtr; + return header->EventSize + sizeof(int); + } + else //if(formatVersion == 6) { - LayoutV4* header = (LayoutV4*)headerPtr; + LayoutV6* header = (LayoutV6*)headerPtr; return header->EventSize + sizeof(int); } } @@ -1555,9 +1679,13 @@ public static int GetHeaderSize(int formatVersion) { return sizeof(LayoutV3) - 4; } - else //if(formatVersion == 4) + else if(formatVersion <= 5) + { + return sizeof(LayoutV5) - 4; + } + else // if (formatVersion == 6) { - return sizeof(LayoutV4) - 4; + return sizeof(LayoutV6) - 4; } } diff --git a/src/TraceEvent/EventPipe/ULZCompression.cs b/src/TraceEvent/EventPipe/ULZCompression.cs new file mode 100644 index 000000000..3f1e1787c --- /dev/null +++ b/src/TraceEvent/EventPipe/ULZCompression.cs @@ -0,0 +1,137 @@ +using System; +using System.Runtime.CompilerServices; + +namespace Microsoft.Diagnostics.Tracing +{ + internal static class ULZCompression + { + private const int MinMatch = 4; + + private const int MaxExcess = 16; + + public static unsafe ArraySegment Decompress(ArraySegment input, int decompressedSize) + { + byte[] output = new byte[decompressedSize + MaxExcess]; + fixed (byte* inputPtr = &input.Array[input.Offset]) + { + fixed (byte* outputPtr = &output[0]) + { + int actualDecompressedSize = Decompress(inputPtr, input.Count, outputPtr, output.Length); + if (decompressedSize != actualDecompressedSize) + { + throw new Exception($"Unexpected decompressed size. Expected: {decompressedSize}, Actual: {actualDecompressedSize}"); + } + + return new ArraySegment(output, 0, decompressedSize); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe void WildCopy(byte* d, byte* s, int n) + { + Unsafe.WriteUnaligned(d, Unsafe.ReadUnaligned(s)); + + for (int i = 8; i < n; i += 8) + { + Unsafe.WriteUnaligned(d + i, Unsafe.ReadUnaligned(s + i)); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe uint DecodeMod(ref byte* p) + { + uint x = 0; + + for (int i = 0; i <= 21; i += 7) + { + uint c = *p++; + x += c << i; + if (c < 128) + { + break; + } + } + + return x; + } + + private static unsafe int Decompress(byte* input, int inputLength, byte* output, int outputLength) + { + byte* op = output; + byte* ip = input; + byte* ipEnd = ip + inputLength; + byte* opEnd = op + outputLength; + + while (ip < ipEnd) + { + int token = *ip++; + + if (token >= 32) + { + int run = token >> 5; + + if (run == 7) + { + run += (int)DecodeMod(ref ip); + } + + if ((opEnd - op) < run || (ipEnd - ip) < run) // Overrun check + { + return -1; + } + + WildCopy(op, ip, run); + + op += run; + ip += run; + + if (ip >= ipEnd) + { + break; + } + } + + int len = (token & 15) + MinMatch; + + if (len == 15 + MinMatch) + { + len += (int)DecodeMod(ref ip); + } + + if (opEnd - op < len) // Overrun check + { + return -1; + } + + int dist = ((token & 16) << 12) + Unsafe.ReadUnaligned(ip); + ip += 2; + byte* cp = op - dist; + if (op - output < dist) + { + return -1; + } + + if (dist >= 8) + { + WildCopy(op, cp, len); + op += len; + } + else + { + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + + while (len-- != 4) + { + *op++ = *cp++; + } + } + } + + return ip == ipEnd ? (int)(op - output) : -1; + } + } +}