Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nettrace compression and multi-process support #1258

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/TraceEvent/EventPipe/EventCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal class EventCache
public event ParseBufferItemFunction OnEvent;
public event Action<int> OnEventsDropped;

public unsafe void ProcessEventBlock(byte[] eventBlockData)
public unsafe void ProcessEventBlock(int version, byte[] eventBlockData)
{
// parse the header
if(eventBlockData.Length < 20)
Expand All @@ -31,15 +31,32 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData)
}
ushort flags = BitConverter.ToUInt16(eventBlockData, 2);
bool useHeaderCompression = (flags & (ushort)EventBlockFlags.HeaderCompression) != 0;
bool isULZCompressed = (flags & (ushort)EventBlockFlags.EventBlockULZCompression) != 0;
mjsabby marked this conversation as resolved.
Show resolved Hide resolved

int eventBlockSize = eventBlockData.Length;
if (isULZCompressed && headerSize >= 24)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the isULZCompressed flag and headerSize don't match I would error similar to the checks above (Assert + return). We should probably have a better error handling scheme, but it at least marks where the errors are detected in the code and prevents continued parsing.

At the moment this if block would not run but also the if(!isULZCompressed) block below would not run, presumably leaving the parser in a broken state.

{
int decompressedSize = (int)BitConverter.ToUInt32(eventBlockData, 20); // Decompressed Size is at offset 20
mjsabby marked this conversation as resolved.
Show resolved Hide resolved
ArraySegment<byte> retVal = ULZCompression.Decompress(new ArraySegment<byte>(eventBlockData, headerSize, eventBlockData.Length - headerSize), decompressedSize);
eventBlockData = retVal.Array;
eventBlockSize = decompressedSize;
}

// parse the events
PinnedBuffer buffer = new PinnedBuffer(eventBlockData);
byte* cursor = (byte*)buffer.PinningHandle.AddrOfPinnedObject();
byte* end = cursor + eventBlockData.Length;
cursor += headerSize;
byte* end = cursor + eventBlockSize;

// if we did compression the byte array was replaced and therefore the cursor doesn't need to be advanced
if (!isULZCompressed)
{
cursor += headerSize;
}

EventMarker eventMarker = new EventMarker(buffer);
long timestamp = 0;
EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header);

EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header);
if (!_threads.TryGetValue(eventMarker.Header.CaptureThreadId, out EventCacheThread thread))
{
thread = new EventCacheThread();
Expand All @@ -49,7 +66,7 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData)
eventMarker = new EventMarker(buffer);
while (cursor < end)
{
EventPipeEventHeader.ReadFromFormatV4(cursor, useHeaderCompression, ref eventMarker.Header);
EventPipeEventHeader.ReadFromFormat(version, cursor, useHeaderCompression, ref eventMarker.Header);
bool isSortedEvent = eventMarker.Header.IsSorted;
timestamp = eventMarker.Header.TimeStamp;
int sequenceNumber = eventMarker.Header.SequenceNumber;
Expand Down
159 changes: 144 additions & 15 deletions src/TraceEvent/EventPipe/EventPipeEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ void ReadEventHeader(byte* headerPtr, bool useHeaderCompression, ref EventPipeEv
{
EventPipeEventHeader.ReadFromFormatV3(headerPtr, ref eventData);
}
else // if (FileFormatVersionNumber == 4)
else
{
EventPipeEventHeader.ReadFromFormatV4(headerPtr, useHeaderCompression, ref eventData);
if(eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes))
EventPipeEventHeader.ReadFromFormat(FileFormatVersionNumber, headerPtr, useHeaderCompression, ref eventData);
if (eventData.MetaDataId != 0 && StackCache.TryGetStack(eventData.StackId, out int stackBytesSize, out IntPtr stackBytes))
{
eventData.StackBytesSize = stackBytesSize;
eventData.StackBytes = stackBytes;
Expand Down Expand Up @@ -887,7 +887,8 @@ public unsafe void FromStream(Deserializer deserializer)
internal enum EventBlockFlags : short
{
Uncompressed = 0,
HeaderCompression = 1
HeaderCompression = 1,
EventBlockULZCompression = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should also update the spec and add tests

}

/// <summary>
Expand All @@ -902,12 +903,13 @@ public EventPipeEventBlock(EventPipeEventSource source) : base(source) { }

protected unsafe override void ReadBlockContents(PinnedStreamReader reader)
{
if(_source.FileFormatVersionNumber >= 4)
int version = _source.FileFormatVersionNumber;
if (version >= 4)
{
_source.ResetCompressedHeader();
byte[] eventBlockBytes = new byte[_endEventData.Sub(_startEventData)];
reader.Read(eventBlockBytes, 0, eventBlockBytes.Length);
_source.EventCache.ProcessEventBlock(eventBlockBytes);
_source.EventCache.ProcessEventBlock(version, eventBlockBytes);
}
else
{
Expand Down Expand Up @@ -1097,6 +1099,7 @@ public EventPipeEventMetaDataHeader(PinnedStreamReader reader, int length, Event
//
// Note: ThreadId isn't 32 bit on all of our platforms but ETW EVENT_RECORD* only has room for a 32 bit
// ID. We'll need to refactor up the stack if we want to expose a bigger ID.
_eventRecord->EventHeader.ProcessId = eventData.ProcessID;
_eventRecord->EventHeader.ThreadId = unchecked((int)eventData.ThreadId);
if (eventData.ThreadId == eventData.CaptureThreadId && eventData.CaptureProcNumber != -1)
{
Expand Down Expand Up @@ -1363,7 +1366,25 @@ public static void ReadFromFormatV3(byte* headerPtr, ref EventPipeEventHeader he
}

[StructLayout(LayoutKind.Sequential, Pack = 1)]
struct LayoutV4
struct LayoutV6
{
public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself.
public int MetaDataId; // a number identifying the description of this event.
public int SequenceNumber;
public long ThreadId;
public long CaptureThreadId;
public int CaptureProcNumber;
public int StackId;
public long TimeStamp;
public Guid ActivityID;
public Guid RelatedActivityID;
public int ProcessID; // process id in a multi-process trace
public int PayloadSize; // size in bytes of the user defined payload data.
public fixed byte Payload[4]; // Actually of variable size. 4 is used to avoid potential alignment issues. This 4 also appears in HeaderSize below.
}

[StructLayout(LayoutKind.Sequential, Pack = 1)]
struct LayoutV5
{
public int EventSize; // Size bytes of this header and the payload and stacks if any. does NOT encode the size of the EventSize field itself.
public int MetaDataId; // a number identifying the description of this event.
Expand All @@ -1388,7 +1409,8 @@ enum CompressedHeaderFlags
ActivityId = 1 << 4,
RelatedActivityId = 1 << 5,
Sorted = 1 << 6,
DataLength = 1 << 7
DataLength = 1 << 7,
ProcessId = 1 << 8,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flags field is a single byte, no room to set the 9th bit : ) I'd suggest changing bit 2 into CaptureThreadPidAndSequence and encoding the process id as the VarInt64(current_event_proc_id - previous_event_proc_id). This means:

Bit 2 is clear (probably most events) -> proc id is unchanged from last event, no additional data encoded in the header
Bit 2 is set, encoded process id field is single byte 0 -> process id is unchanged from last event, 1 additional byte used in header. This case happens every time two adjacent events are logged from different threads in the same process.
Bit 2 is set, encoded process id field is non-zero -> process_id = prev_event_process_id + ReadVarInt64(encoded_proc_id_field). This occurs whenever adjacent events have different PID. Encoding size is variable depending on magnitude of proc id, probably 2 bytes.

We may also want an optimization that single-proc traces never encode a process id regardless if bit 2 is set. This ensures the runtime produced traces don't regress in size.

}

static uint ReadVarUInt32(ref byte* pCursor)
Expand Down Expand Up @@ -1431,11 +1453,108 @@ static ulong ReadVarUInt64(ref byte* pCursor)
return val;
}

public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header)
public static void ReadFromFormat(int version, byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header)
{
switch (version)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only need to add one new major version? The current shipped version of the format is 4 and the new one would be 5.

Copy link
Contributor

@josalem josalem Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit wary of mixing v4 and v5 functionality and having a single implementation for both. I realize this might make for a little code duplication. Presumably any feature work we do in the runtime during .NET 6.0 that would also necessitate a version increase will get rolled into v5 as well. This could mean that we need to bring back the v4 version of the code later anyway if the delta between v4 and v5 becomes large enough.

case 4:
case 5:
ReadFromFormatV5(headerPtr, useHeaderCompresion, ref header);
break;
case 6:
ReadFromFormatV6(headerPtr, useHeaderCompresion, ref header);
break;
default:
throw new Exception("Unexpected version");
}
}

private static void ReadFromFormatV6(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header)
{
if (!useHeaderCompresion)
{
LayoutV6* pLayout = (LayoutV6*)headerPtr;
header.EventSize = pLayout->EventSize;
header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF;
header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0;
header.SequenceNumber = pLayout->SequenceNumber;
header.ThreadId = pLayout->ThreadId;
header.CaptureThreadId = pLayout->CaptureThreadId;
header.CaptureProcNumber = pLayout->CaptureProcNumber;
header.StackId = pLayout->StackId;
header.TimeStamp = pLayout->TimeStamp;
header.ActivityID = pLayout->ActivityID;
header.RelatedActivityID = pLayout->RelatedActivityID;
header.PayloadSize = pLayout->PayloadSize;
header.ProcessID = pLayout->ProcessID;
header.Payload = (IntPtr)pLayout->Payload;
header.HeaderSize = (sizeof(LayoutV6) - 4);
int totalSize = header.EventSize + 4;
header.TotalNonHeaderSize = totalSize - header.HeaderSize;
}
else
{
byte* headerStart = headerPtr;
byte flags = *headerPtr;
headerPtr++;
if ((flags & (byte)CompressedHeaderFlags.MetadataId) != 0)
{
header.MetaDataId = (int)ReadVarUInt32(ref headerPtr);
}
if ((flags & (byte)CompressedHeaderFlags.CaptureThreadAndSequence) != 0)
{
header.SequenceNumber += (int)ReadVarUInt32(ref headerPtr) + 1;
header.CaptureThreadId = (long)ReadVarUInt64(ref headerPtr);
header.CaptureProcNumber = (int)ReadVarUInt32(ref headerPtr);
}
else
{
if (header.MetaDataId != 0)
{
header.SequenceNumber++;
}
}
if ((flags & (byte)CompressedHeaderFlags.ThreadId) != 0)
{
header.ThreadId = (int)ReadVarUInt64(ref headerPtr);
}
if ((flags & (byte)CompressedHeaderFlags.StackId) != 0)
{
header.StackId = (int)ReadVarUInt32(ref headerPtr);
}
ulong timestampDelta = ReadVarUInt64(ref headerPtr);
header.TimeStamp += (long)timestampDelta;
if ((flags & (byte)CompressedHeaderFlags.ActivityId) != 0)
{
header.ActivityID = *(Guid*)headerPtr;
headerPtr += sizeof(Guid);
}
if ((flags & (byte)CompressedHeaderFlags.RelatedActivityId) != 0)
{
header.RelatedActivityID = *(Guid*)headerPtr;
headerPtr += sizeof(Guid);
}
header.IsSorted = (flags & (byte)CompressedHeaderFlags.Sorted) != 0;
if ((flags & (ushort)CompressedHeaderFlags.ProcessId) != 0)
{
header.ProcessID = (int)ReadVarUInt32(ref headerPtr);
}
if ((flags & (byte)CompressedHeaderFlags.DataLength) != 0)
{
header.PayloadSize = (int)ReadVarUInt32(ref headerPtr);
}
header.Payload = (IntPtr)headerPtr;

header.HeaderSize = (int)(headerPtr - headerStart);
header.TotalNonHeaderSize = header.PayloadSize;
}
}

private static void ReadFromFormatV5(byte* headerPtr, bool useHeaderCompresion, ref EventPipeEventHeader header)
{
if (!useHeaderCompresion)
{
LayoutV4* pLayout = (LayoutV4*)headerPtr;
LayoutV5* pLayout = (LayoutV5*)headerPtr;
header.EventSize = pLayout->EventSize;
header.MetaDataId = pLayout->MetaDataId & 0x7FFF_FFFF;
header.IsSorted = ((uint)pLayout->MetaDataId & 0x8000_0000) == 0;
Expand All @@ -1449,7 +1568,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r
header.RelatedActivityID = pLayout->RelatedActivityID;
header.PayloadSize = pLayout->PayloadSize;
header.Payload = (IntPtr)pLayout->Payload;
header.HeaderSize = (sizeof(LayoutV4) - 4);
header.HeaderSize = (sizeof(LayoutV5) - 4);
int totalSize = header.EventSize + 4;
header.TotalNonHeaderSize = totalSize - header.HeaderSize;
}
Expand Down Expand Up @@ -1517,6 +1636,7 @@ public static void ReadFromFormatV4(byte* headerPtr, bool useHeaderCompresion, r
public Guid ActivityID;
public Guid RelatedActivityID;
public bool IsSorted;
public int ProcessID; // process id in multi-process trace
public int PayloadSize; // size in bytes of the user defined payload data.
public IntPtr Payload;
public int StackId;
Expand All @@ -1538,9 +1658,14 @@ public static int GetTotalEventSize(byte* headerPtr, int formatVersion)
LayoutV3* header = (LayoutV3*)headerPtr;
return header->EventSize + sizeof(int);
}
else //if(formatVersion == 4)
else if(formatVersion <= 5)
{
LayoutV5* header = (LayoutV5*)headerPtr;
return header->EventSize + sizeof(int);
}
else //if(formatVersion == 6)
{
LayoutV4* header = (LayoutV4*)headerPtr;
LayoutV6* header = (LayoutV6*)headerPtr;
return header->EventSize + sizeof(int);
}
}
Expand All @@ -1554,9 +1679,13 @@ public static int GetHeaderSize(int formatVersion)
{
return sizeof(LayoutV3) - 4;
}
else //if(formatVersion == 4)
else if(formatVersion <= 5)
{
return sizeof(LayoutV5) - 4;
}
else // if (formatVersion == 6)
{
return sizeof(LayoutV4) - 4;
return sizeof(LayoutV6) - 4;
}
}

Expand Down
Loading