@@ -114,7 +114,8 @@ EventPipeBuffer *
114114buffer_manager_advance_to_non_empty_buffer (
115115 EventPipeBufferManager * buffer_manager ,
116116 EventPipeThreadSessionState * thread_session_state ,
117- ep_timestamp_t before_timestamp );
117+ ep_timestamp_t before_timestamp ,
118+ bool * thread_session_state_exhausted );
118119
119120// Detaches this buffer from an active writer thread and marks it read-only so that the reader
120121// thread can use it.
@@ -124,6 +125,12 @@ buffer_manager_convert_buffer_to_read_only (
124125 EventPipeBufferManager * buffer_manager ,
125126 EventPipeBuffer * new_read_buffer );
126127
128+ static
129+ void
130+ buffer_manager_remove_and_delete_thread_session_state (
131+ EventPipeBufferManager * buffer_manager ,
132+ EventPipeThreadSessionState * thread_session_state );
133+
127134/*
128135 * EventPipeBufferList.
129136 */
@@ -593,7 +600,8 @@ buffer_manager_move_next_event_any_thread (
593600 EventPipeBuffer * buffer ;
594601 EventPipeEventInstance * next_event ;
595602 DN_LIST_FOREACH_BEGIN (EventPipeThreadSessionState * , thread_session_state , cached_thread_session_state_list ) {
596- buffer = buffer_manager_advance_to_non_empty_buffer (buffer_manager , thread_session_state , stop_timestamp );
603+ bool thread_session_state_exhausted = false;
604+ buffer = buffer_manager_advance_to_non_empty_buffer (buffer_manager , thread_session_state , stop_timestamp , & thread_session_state_exhausted );
597605 if (buffer ) {
598606 // Peek the next event out of the buffer.
599607 next_event = ep_buffer_get_current_read_event (buffer );
@@ -605,8 +613,20 @@ buffer_manager_move_next_event_any_thread (
605613 oldest_timestamp = ep_event_instance_get_timestamp (buffer_manager -> current_event );
606614 }
607615 }
616+ if (thread_session_state_exhausted ) {
617+ EP_SPIN_LOCK_ENTER (& buffer_manager -> rt_lock , section2 )
618+ buffer_manager_remove_and_delete_thread_session_state (buffer_manager , thread_session_state );
619+ EP_SPIN_LOCK_EXIT (& buffer_manager -> rt_lock , section2 )
620+ }
608621 } DN_LIST_FOREACH_END ;
609622
623+ // The current event will be read, so update the last read sequence number for that thread session state.
624+ if (buffer_manager -> current_event != NULL )
625+ {
626+ uint32_t sequence_number = ep_buffer_get_current_sequence_number (buffer_manager -> current_buffer );
627+ ep_thread_session_state_set_last_read_sequence_number (buffer_manager -> current_thread_session_state , sequence_number );
628+ }
629+
610630ep_on_exit :
611631 ep_buffer_manager_requires_lock_not_held (buffer_manager );
612632 dn_list_free (cached_thread_session_state_list );
@@ -634,7 +654,9 @@ buffer_manager_move_next_event_same_thread (
634654 ep_buffer_move_next_read_event (buffer_manager -> current_buffer );
635655
636656 // Find the first buffer in the list, if any, which has an event in it
637- buffer_manager -> current_buffer = buffer_manager_advance_to_non_empty_buffer (buffer_manager , buffer_manager -> current_thread_session_state , stop_timestamp );
657+ EventPipeThreadSessionState * current_thread_session_state = buffer_manager -> current_thread_session_state ;
658+ bool thread_session_state_exhausted = false;
659+ buffer_manager -> current_buffer = buffer_manager_advance_to_non_empty_buffer (buffer_manager , current_thread_session_state , stop_timestamp , & thread_session_state_exhausted );
638660
639661 if (buffer_manager -> current_buffer ) {
640662 // get the event from that buffer
@@ -650,21 +672,37 @@ buffer_manager_move_next_event_same_thread (
650672 buffer_manager -> current_event = next_event ;
651673 EP_ASSERT (buffer_manager -> current_buffer != NULL );
652674 EP_ASSERT (buffer_manager -> current_thread_session_state != NULL );
675+ // The current event will be read, so update the last read sequence number for that thread session state.
676+ uint32_t sequence_number = ep_buffer_get_current_sequence_number (buffer_manager -> current_buffer );
677+ ep_thread_session_state_set_last_read_sequence_number (current_thread_session_state , sequence_number );
653678 }
654679 } else {
655680 // no more buffers prior to before_timestamp
656681 EP_ASSERT (buffer_manager -> current_event == NULL );
657682 EP_ASSERT (buffer_manager -> current_buffer == NULL );
658683 buffer_manager -> current_thread_session_state = NULL ;
659684 }
685+
686+ if (thread_session_state_exhausted ) {
687+ EP_SPIN_LOCK_ENTER (& buffer_manager -> rt_lock , section1 )
688+ buffer_manager_remove_and_delete_thread_session_state (buffer_manager , current_thread_session_state );
689+ EP_SPIN_LOCK_EXIT (& buffer_manager -> rt_lock , section1 )
690+ }
691+
692+ ep_on_exit :
693+ return ;
694+
695+ ep_on_error :
696+ ep_exit_error_handler ();
660697}
661698
662699static
663700EventPipeBuffer *
664701buffer_manager_advance_to_non_empty_buffer (
665702 EventPipeBufferManager * buffer_manager ,
666703 EventPipeThreadSessionState * thread_session_state ,
667- ep_timestamp_t before_timestamp )
704+ ep_timestamp_t before_timestamp ,
705+ bool * thread_session_state_exhausted )
668706{
669707 EP_ASSERT (buffer_manager != NULL );
670708 EP_ASSERT (thread_session_state != NULL );
@@ -675,6 +713,8 @@ buffer_manager_advance_to_non_empty_buffer (
675713 EP_ASSERT (buffer_list != NULL );
676714 EventPipeBuffer * current_buffer = buffer_list -> head_buffer ;
677715 EP_ASSERT (current_buffer != NULL );
716+ * thread_session_state_exhausted = false;
717+ bool done = false;
678718 while (!done ) {
679719 buffer_manager_convert_buffer_to_read_only (buffer_manager , current_buffer );
680720 if (ep_buffer_get_current_read_event (current_buffer ) != NULL ) {
@@ -689,6 +729,10 @@ buffer_manager_advance_to_non_empty_buffer (
689729
690730 // get the next buffer
691731 current_buffer = buffer_list -> head_buffer ;
732+
733+ if (!current_buffer && (ep_rt_volatile_load_uint32_t_without_barrier (ep_thread_get_unregistered_ref (ep_thread_session_state_get_thread (thread_session_state ))) > 0 ))
734+ * thread_session_state_exhausted = true;
735+
692736 if (!current_buffer || ep_buffer_get_creation_timestamp (current_buffer ) >= before_timestamp ) {
693737 // no more buffers in the list before this timestamp, we're done
694738 current_buffer = NULL ;
@@ -1131,8 +1175,6 @@ ep_buffer_manager_write_all_buffers_to_file_v4 (
11311175
11321176 uint64_t capture_thread_id = ep_thread_get_os_thread_id (ep_buffer_get_writer_thread (buffer_manager -> current_buffer ));
11331177
1134- EventPipeThreadSessionState * current_thread_session_state = buffer_manager -> current_thread_session_state ;
1135-
11361178 // loop across events on this thread
11371179 bool events_written_for_thread = false;
11381180
@@ -1148,7 +1190,6 @@ ep_buffer_manager_write_all_buffers_to_file_v4 (
11481190 buffer_manager_move_next_event_same_thread (buffer_manager , current_timestamp_boundary );
11491191 }
11501192
1151- ep_thread_session_state_set_last_read_sequence_number (current_thread_session_state , sequence_number );
11521193 // Have we written events in any sequence point?
11531194 * events_written = events_written_for_thread || * events_written ;
11541195 }
@@ -1166,8 +1207,7 @@ ep_buffer_manager_write_all_buffers_to_file_v4 (
11661207 // through the events we may have observed that a higher numbered event was recorded. If so we
11671208 // should adjust the sequence numbers upwards to ensure the data in the stream is consistent.
11681209 EP_SPIN_LOCK_ENTER (& buffer_manager -> rt_lock , section2 )
1169- for (dn_list_it_t it = dn_list_begin (buffer_manager -> thread_session_state_list ); !dn_list_it_end (it ); ) {
1170- EventPipeThreadSessionState * session_state = * dn_list_it_data_t (it , EventPipeThreadSessionState * );
1210+ DN_LIST_FOREACH_BEGIN (EventPipeThreadSessionState * , session_state , buffer_manager -> thread_session_state_list ) {
11711211 EventPipeThread * thread = ep_thread_session_state_get_thread (session_state );
11721212 dn_umap_it_t found = dn_umap_ptr_uint32_find (ep_sequence_point_get_thread_sequence_numbers (sequence_point ), thread );
11731213 uint32_t thread_sequence_number = !dn_umap_it_end (found ) ? dn_umap_it_value_uint32_t (found ) : 0 ;
@@ -1181,23 +1221,7 @@ ep_buffer_manager_write_all_buffers_to_file_v4 (
11811221 if (dn_umap_it_end (found ))
11821222 ep_thread_addref (thread );
11831223 }
1184-
1185- it = dn_list_it_next (it );
1186-
1187- // if a session_state was exhausted during this sequence point, mark it for deletion
1188- if (ep_thread_session_state_get_buffer_list (session_state )-> head_buffer == NULL ) {
1189- // We don't hold the thread lock here, so it technically races with a thread getting unregistered. This is okay,
1190- // because we will either not have passed the above if statement (there were events still in the buffers) or we
1191- // will catch it at the next sequence point.
1192- EventPipeThread * thread = ep_thread_session_state_get_thread (session_state );
1193- EventPipeSession * session = ep_thread_session_state_get_session (session_state );
1194- if (ep_rt_volatile_load_uint32_t_without_barrier (ep_thread_get_unregistered_ref (thread )) > 0 ) {
1195- dn_vector_ptr_push_back (& session_states_to_delete , session_state );
1196- dn_list_remove (buffer_manager -> thread_session_state_list , session_state );
1197- ep_thread_set_session_state (thread , session , NULL );
1198- }
1199- }
1200- }
1224+ } DN_LIST_FOREACH_END ;
12011225 EP_SPIN_LOCK_EXIT (& buffer_manager -> rt_lock , section2 )
12021226
12031227 // emit the sequence point into the file
@@ -1209,34 +1233,6 @@ ep_buffer_manager_write_all_buffers_to_file_v4 (
12091233 EP_SPIN_LOCK_EXIT (& buffer_manager -> rt_lock , section3 )
12101234 }
12111235
1212- // There are sequence points created during this flush and we've marked session states for deletion.
1213- // We need to remove these from the internal maps of the subsequent Sequence Points
1214- if (dn_vector_ptr_size (& session_states_to_delete ) > 0 ) {
1215- EP_SPIN_LOCK_ENTER (& buffer_manager -> rt_lock , section4 )
1216- if (buffer_manager_try_peek_sequence_point (buffer_manager , & sequence_point )) {
1217- DN_LIST_FOREACH_BEGIN (EventPipeSequencePoint * , current_sequence_point , buffer_manager -> sequence_points ) {
1218- // foreach (session_state in session_states_to_delete)
1219- DN_VECTOR_PTR_FOREACH_BEGIN (EventPipeThreadSessionState * , thread_session_state , & session_states_to_delete ) {
1220- EventPipeThread * thread = ep_thread_session_state_get_thread (thread_session_state );
1221- dn_umap_it_t found = dn_umap_ptr_uint32_find (ep_sequence_point_get_thread_sequence_numbers (current_sequence_point ), thread );
1222- if (!dn_umap_it_end (found )) {
1223- dn_umap_erase (found );
1224- // every entry of this map was holding an extra ref to the thread (see: ep-event-instance.{h|c})
1225- ep_thread_release (thread );
1226- }
1227- } DN_VECTOR_PTR_FOREACH_END ;
1228- } DN_LIST_FOREACH_END ;
1229- }
1230-
1231- // foreach (thread_session_state in session_states_to_delete)
1232- DN_VECTOR_PTR_FOREACH_BEGIN (EventPipeThreadSessionState * , thread_session_state , & session_states_to_delete ) {
1233- EP_ASSERT (thread_session_state != NULL );
1234- ep_thread_session_state_free (thread_session_state );
1235- } DN_VECTOR_PTR_FOREACH_END ;
1236-
1237- EP_SPIN_LOCK_EXIT (& buffer_manager -> rt_lock , section4 )
1238- }
1239-
12401236ep_on_exit :
12411237 dn_vector_ptr_dispose (& session_states_to_delete );
12421238 return ;
@@ -1318,6 +1314,53 @@ ep_buffer_manager_ensure_consistency (EventPipeBufferManager *buffer_manager)
13181314}
13191315#endif
13201316
1317+ // While deleting a thread session state, this is the last chance to update sequence points
1318+ // mapped thread sequence numbers. The current sequence point needs to have the latest read
1319+ // sequence number, where as future sequence points can drop the thread entry.
1320+ static
1321+ void
1322+ buffer_manager_remove_and_delete_thread_session_state (
1323+ EventPipeBufferManager * buffer_manager ,
1324+ EventPipeThreadSessionState * thread_session_state )
1325+ {
1326+ EP_ASSERT (buffer_manager != NULL );
1327+ EP_ASSERT (thread_session_state != NULL );
1328+
1329+ ep_buffer_manager_requires_lock_held (buffer_manager );
1330+
1331+ EventPipeSequencePoint * current_sequence_point = NULL ;
1332+ if (buffer_manager_try_peek_sequence_point (buffer_manager , & current_sequence_point )) {
1333+ DN_LIST_FOREACH_BEGIN (EventPipeSequencePoint * , sequence_point , buffer_manager -> sequence_points ) {
1334+ EventPipeThread * thread = ep_thread_session_state_get_thread (thread_session_state );
1335+ dn_umap_t * thread_sequence_numbers = ep_sequence_point_get_thread_sequence_numbers (sequence_point );
1336+ dn_umap_it_t found = dn_umap_ptr_uint32_find (thread_sequence_numbers , thread );
1337+ if (current_sequence_point != sequence_point ) {
1338+ if (!dn_umap_it_end (found )) {
1339+ ep_thread_release (thread );
1340+ dn_umap_erase_key (thread_sequence_numbers , thread );
1341+ }
1342+ continue ;
1343+ }
1344+
1345+ uint32_t thread_sequence_number = !dn_umap_it_end (found ) ? dn_umap_it_value_uint32_t (found ) : 0 ;
1346+ // Sequence numbers can overflow so we can't use a direct last_read > sequence_number comparison
1347+ // If a thread is able to drop more than 0x80000000 events in between sequence points then we will
1348+ // miscategorize it, but that seems unlikely.
1349+ uint32_t sequence_number = ep_thread_session_state_get_last_read_sequence_number (thread_session_state );
1350+ uint32_t last_read_delta = sequence_number - thread_sequence_number ;
1351+ if (0 < last_read_delta && last_read_delta < 0x80000000 ) {
1352+ dn_umap_ptr_uint32_insert_or_assign (thread_sequence_numbers , thread , sequence_number );
1353+ if (dn_umap_it_end (found ))
1354+ ep_thread_addref (thread );
1355+ }
1356+ } DN_LIST_FOREACH_END ;
1357+ }
1358+
1359+ dn_list_remove (buffer_manager -> thread_session_state_list , thread_session_state );
1360+ ep_thread_set_session_state (ep_thread_session_state_get_thread (thread_session_state ), ep_thread_session_state_get_session (thread_session_state ), NULL );
1361+ ep_thread_session_state_free (thread_session_state );
1362+ }
1363+
13211364#endif /* !defined(EP_INCLUDE_SOURCE_FILES) || defined(EP_FORCE_INCLUDE_SOURCE_FILES) */
13221365#endif /* ENABLE_PERFTRACING */
13231366
0 commit comments