diff --git a/src/TraceEvent/EventPipe/EventCache.cs b/src/TraceEvent/EventPipe/EventCache.cs index 4ce5d80cf..d5766ccb3 100644 --- a/src/TraceEvent/EventPipe/EventCache.cs +++ b/src/TraceEvent/EventPipe/EventCache.cs @@ -15,7 +15,7 @@ internal class EventCache public event ParseBufferItemFunction OnEvent; public event Action OnEventsDropped; - public unsafe void ProcessEventBlock(byte[] eventBlockData) + public unsafe void ProcessEventBlock(int version, byte[] eventBlockData) { // parse the header if(eventBlockData.Length < 20) @@ -31,15 +31,32 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) } ushort flags = BitConverter.ToUInt16(eventBlockData, 2); bool useHeaderCompression = (flags & (ushort)EventBlockFlags.HeaderCompression) != 0; + bool isULZCompressed = (flags & (ushort)EventBlockFlags.EventBlockULZCompression) != 0; + + int eventBlockSize = eventBlockData.Length; + if (isULZCompressed && headerSize >= 24) + { + int decompressedSize = (int)BitConverter.ToUInt32(eventBlockData, 20); // Decompressed Size is at offset 20 + ArraySegment retVal = ULZCompression.Decompress(new ArraySegment(eventBlockData, headerSize, eventBlockData.Length - headerSize), decompressedSize); + eventBlockData = retVal.Array; + eventBlockSize = decompressedSize; + } // parse the events PinnedBuffer buffer = new PinnedBuffer(eventBlockData); byte* cursor = (byte*)buffer.PinningHandle.AddrOfPinnedObject(); - byte* end = cursor + eventBlockData.Length; - cursor += headerSize; + byte* end = cursor + eventBlockSize; + + // if we did compression the byte array was replaced and therefore the cursor doesn't need to be advanced + if (!isULZCompressed) + { + cursor += headerSize; + } + EventMarker eventMarker = new EventMarker(buffer); long timestamp = 0; - EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header); + + EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header); if (!_threads.TryGetValue(eventMarker.Header.CaptureThreadId, out EventCacheThread thread)) { thread = new EventCacheThread(); @@ -49,7 +66,7 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData) eventMarker = new EventMarker(buffer); while (cursor < end) { - EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header); + EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header); bool isSortedEvent = eventMarker.Header.IsSorted; timestamp = eventMarker.Header.TimeStamp; int sequenceNumber = eventMarker.Header.SequenceNumber; diff --git a/src/TraceEvent/EventPipe/EventPipeEventSource.cs b/src/TraceEvent/EventPipe/EventPipeEventSource.cs index 56cf22ab6..dc4f0a5b2 100644 --- a/src/TraceEvent/EventPipe/EventPipeEventSource.cs +++ b/src/TraceEvent/EventPipe/EventPipeEventSource.cs @@ -218,10 +218,10 @@ void ReadEventHeader(byte* headerPtr, bool useHeaderCompression, ref EventPipeEv { EventPipeEventHeader.ReadFromFormatV3(headerPtr, ref eventData); } - else // if (FileFormatVersionNumber == 4) + else { - EventPipeEventHeader.ReadFromFormatV4(headerPtr, useHeaderCompression, ref eventData); - if(eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes)) + EventPipeEventHeader.ReadFromFormat(FileFormatVersionNumber, headerPtr, useHeaderCompression, ref eventData); + if (eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes)) { eventData.StackBytesSize = stackBytesSize; eventData.StackBytes = stackBytes; @@ -887,7 +887,8 @@ public unsafe void FromStream(Deserializer deserializer) internal enum EventBlockFlags : short { Uncompressed = 0, - HeaderCompression = 1 + HeaderCompression = 1, + EventBlockULZCompression = 2 } /// @@ -902,12 +903,13 @@ public EventPipeEventBlock(EventPipeEventSource source) : base(source) { } protected unsafe override void ReadBlockContents(PinnedStreamReader reader) { - if(_source.FileFormatVersionNumber >= 4) + int version = _source.FileFormatVersionNumber; + if (version >= 4) { _source.ResetCompressedHeader(); byte[] eventBlockBytes = new byte[_endEventData.Sub(_startEventData)]; reader.Read(eventBlockBytes, 0, eventBlockBytes.Length); - _source.EventCache.ProcessEventBlock(eventBlockBytes); + _source.EventCache.ProcessEventBlock(version, eventBlockBytes); } else { @@ -1097,6 +1099,7 @@ public EventPipeEventMetaDataHeader(PinnedStreamReader reader, int length, Event // // Note: ThreadId isn't 32 bit on all of our platforms but ETW EVENT_RECORD* only has room for a 32 bit // ID. We'll need to refactor up the stack if we want to expose a bigger ID. + _eventRecord->EventHeader.ProcessId = eventData.ProcessID; _eventRecord->EventHeader.ThreadId = unchecked((int)eventData.ThreadId); if (eventData.ThreadId == eventData.CaptureThreadId && eventData.CaptureProcNumber != -1) { @@ -1363,7 +1366,25 @@ public static void ReadFromFormatV3(byte* headerPtr, ref EventPipeEventHeader he } [StructLayout(LayoutKind.Sequential, Pack = 1)] - struct LayoutV4 + struct LayoutV6 + { + public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself. + public int MetaDataId; // a number identifying the description of this event. + public int SequenceNumber; + public long ThreadId; + public long CaptureThreadId; + public int CaptureProcNumber; + public int StackId; + public long TimeStamp; + public Guid ActivityID; + public Guid RelatedActivityID; + public int ProcessID; // process id in a multi-process trace + public int PayloadSize; // size in bytes of the user defined payload data. + public fixed byte Payload[4]; // Actually of variable size. 4 is used to avoid potential alignment issues. This 4 also appears in HeaderSize below. + } + + [StructLayout(LayoutKind.Sequential, Pack = 1)] + struct LayoutV5 { public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself. public int MetaDataId; // a number identifying the description of this event. @@ -1388,7 +1409,8 @@ enum CompressedHeaderFlags ActivityId = 1 << 4, RelatedActivityId = 1 << 5, Sorted = 1 << 6, - DataLength = 1 << 7 + DataLength = 1 << 7, + ProcessId = 1 << 8, } static uint ReadVarUInt32(ref byte* pCursor) @@ -1431,11 +1453,108 @@ static ulong ReadVarUInt64(ref byte* pCursor) return val; } - public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + public static void ReadFromFormat(int version, byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + { + switch (version) + { + case 4: + case 5: + ReadFromFormatV5(headerPtr, useHeaderCompresion, ref header); + break; + case 6: + ReadFromFormatV6(headerPtr, useHeaderCompresion, ref header); + break; + default: + throw new Exception("Unexpected version"); + } + } + + private static void ReadFromFormatV6(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) + { + if (!useHeaderCompresion) + { + LayoutV6* pLayout = (LayoutV6*)headerPtr; + header.EventSize = pLayout->EventSize; + header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF; + header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0; + header.SequenceNumber = pLayout->SequenceNumber; + header.ThreadId = pLayout->ThreadId; + header.CaptureThreadId = pLayout->CaptureThreadId; + header.CaptureProcNumber = pLayout->CaptureProcNumber; + header.StackId = pLayout->StackId; + header.TimeStamp = pLayout->TimeStamp; + header.ActivityID = pLayout->ActivityID; + header.RelatedActivityID = pLayout->RelatedActivityID; + header.PayloadSize = pLayout->PayloadSize; + header.ProcessID = pLayout->ProcessID; + header.Payload = (IntPtr)pLayout->Payload; + header.HeaderSize = (sizeof(LayoutV6) - 4); + int totalSize = header.EventSize + 4; + header.TotalNonHeaderSize = totalSize - header.HeaderSize; + } + else + { + byte* headerStart = headerPtr; + byte flags = *headerPtr; + headerPtr++; + if ((flags & (byte)CompressedHeaderFlags.MetadataId) != 0) + { + header.MetaDataId = (int)ReadVarUInt32(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.CaptureThreadAndSequence) != 0) + { + header.SequenceNumber += (int)ReadVarUInt32(ref headerPtr) + 1; + header.CaptureThreadId = (long)ReadVarUInt64(ref headerPtr); + header.CaptureProcNumber = (int)ReadVarUInt32(ref headerPtr); + } + else + { + if (header.MetaDataId != 0) + { + header.SequenceNumber++; + } + } + if ((flags & (byte)CompressedHeaderFlags.ThreadId) != 0) + { + header.ThreadId = (int)ReadVarUInt64(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.StackId) != 0) + { + header.StackId = (int)ReadVarUInt32(ref headerPtr); + } + ulong timestampDelta = ReadVarUInt64(ref headerPtr); + header.TimeStamp += (long)timestampDelta; + if ((flags & (byte)CompressedHeaderFlags.ActivityId) != 0) + { + header.ActivityID = *(Guid*)headerPtr; + headerPtr += sizeof(Guid); + } + if ((flags & (byte)CompressedHeaderFlags.RelatedActivityId) != 0) + { + header.RelatedActivityID = *(Guid*)headerPtr; + headerPtr += sizeof(Guid); + } + header.IsSorted = (flags & (byte)CompressedHeaderFlags.Sorted) != 0; + if ((flags & (ushort)CompressedHeaderFlags.ProcessId) != 0) + { + header.ProcessID = (int)ReadVarUInt32(ref headerPtr); + } + if ((flags & (byte)CompressedHeaderFlags.DataLength) != 0) + { + header.PayloadSize = (int)ReadVarUInt32(ref headerPtr); + } + header.Payload = (IntPtr)headerPtr; + + header.HeaderSize = (int)(headerPtr - headerStart); + header.TotalNonHeaderSize = header.PayloadSize; + } + } + + private static void ReadFromFormatV5(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header) { if (!useHeaderCompresion) { - LayoutV4* pLayout = (LayoutV4*)headerPtr; + LayoutV5* pLayout = (LayoutV5*)headerPtr; header.EventSize = pLayout->EventSize; header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF; header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0; @@ -1449,7 +1568,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r header.RelatedActivityID = pLayout->RelatedActivityID; header.PayloadSize = pLayout->PayloadSize; header.Payload = (IntPtr)pLayout->Payload; - header.HeaderSize = (sizeof(LayoutV4) - 4); + header.HeaderSize = (sizeof(LayoutV5) - 4); int totalSize = header.EventSize + 4; header.TotalNonHeaderSize = totalSize - header.HeaderSize; } @@ -1517,6 +1636,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r public Guid ActivityID; public Guid RelatedActivityID; public bool IsSorted; + public int ProcessID; // process id in multi-process trace public int PayloadSize; // size in bytes of the user defined payload data. public IntPtr Payload; public int StackId; @@ -1538,9 +1658,14 @@ public static int GetTotalEventSize(byte* headerPtr, int formatVersion) LayoutV3* header = (LayoutV3*)headerPtr; return header->EventSize + sizeof(int); } - else //if(formatVersion == 4) + else if(formatVersion <= 5) + { + LayoutV5* header = (LayoutV5*)headerPtr; + return header->EventSize + sizeof(int); + } + else //if(formatVersion == 6) { - LayoutV4* header = (LayoutV4*)headerPtr; + LayoutV6* header = (LayoutV6*)headerPtr; return header->EventSize + sizeof(int); } } @@ -1554,9 +1679,13 @@ public static int GetHeaderSize(int formatVersion) { return sizeof(LayoutV3) - 4; } - else //if(formatVersion == 4) + else if(formatVersion <= 5) + { + return sizeof(LayoutV5) - 4; + } + else // if (formatVersion == 6) { - return sizeof(LayoutV4) - 4; + return sizeof(LayoutV6) - 4; } } diff --git a/src/TraceEvent/EventPipe/ULZCompression.cs b/src/TraceEvent/EventPipe/ULZCompression.cs new file mode 100644 index 000000000..3f1e1787c --- /dev/null +++ b/src/TraceEvent/EventPipe/ULZCompression.cs @@ -0,0 +1,137 @@ +using System; +using System.Runtime.CompilerServices; + +namespace Microsoft.Diagnostics.Tracing +{ + internal static class ULZCompression + { + private const int MinMatch = 4; + + private const int MaxExcess = 16; + + public static unsafe ArraySegment Decompress(ArraySegment input, int decompressedSize) + { + byte[] output = new byte[decompressedSize + MaxExcess]; + fixed (byte* inputPtr = &input.Array[input.Offset]) + { + fixed (byte* outputPtr = &output[0]) + { + int actualDecompressedSize = Decompress(inputPtr, input.Count, outputPtr, output.Length); + if (decompressedSize != actualDecompressedSize) + { + throw new Exception($"Unexpected decompressed size. Expected: {decompressedSize}, Actual: {actualDecompressedSize}"); + } + + return new ArraySegment(output, 0, decompressedSize); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe void WildCopy(byte* d, byte* s, int n) + { + Unsafe.WriteUnaligned(d, Unsafe.ReadUnaligned(s)); + + for (int i = 8; i < n; i += 8) + { + Unsafe.WriteUnaligned(d + i, Unsafe.ReadUnaligned(s + i)); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe uint DecodeMod(ref byte* p) + { + uint x = 0; + + for (int i = 0; i <= 21; i += 7) + { + uint c = *p++; + x += c << i; + if (c < 128) + { + break; + } + } + + return x; + } + + private static unsafe int Decompress(byte* input, int inputLength, byte* output, int outputLength) + { + byte* op = output; + byte* ip = input; + byte* ipEnd = ip + inputLength; + byte* opEnd = op + outputLength; + + while (ip < ipEnd) + { + int token = *ip++; + + if (token >= 32) + { + int run = token >> 5; + + if (run == 7) + { + run += (int)DecodeMod(ref ip); + } + + if ((opEnd - op) < run || (ipEnd - ip) < run) // Overrun check + { + return -1; + } + + WildCopy(op, ip, run); + + op += run; + ip += run; + + if (ip >= ipEnd) + { + break; + } + } + + int len = (token & 15) + MinMatch; + + if (len == 15 + MinMatch) + { + len += (int)DecodeMod(ref ip); + } + + if (opEnd - op < len) // Overrun check + { + return -1; + } + + int dist = ((token & 16) << 12) + Unsafe.ReadUnaligned(ip); + ip += 2; + byte* cp = op - dist; + if (op - output < dist) + { + return -1; + } + + if (dist >= 8) + { + WildCopy(op, cp, len); + op += len; + } + else + { + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + *op++ = *cp++; + + while (len-- != 4) + { + *op++ = *cp++; + } + } + } + + return ip == ipEnd ? (int)(op - output) : -1; + } + } +}