@@ -144,7 +144,7 @@ def analyze_trace(
144144 An AnalyzerResultType object containing the analysis results.
145145 """
146146 # Check if high-level metrics are checkpointed
147- proc_view_types = list ( sorted ( set ( view_types ). union ({ COL_PROC_NAME })) )
147+ proc_view_types = self . ensure_proc_view_type ( view_types = view_types )
148148 hlm_checkpoint_name = self .get_hlm_checkpoint_name (view_types = proc_view_types )
149149 traces = None
150150 raw_stats = None
@@ -194,128 +194,22 @@ def analyze_trace(
194194 # Validate time granularity
195195 # self.validate_time_granularity(hlm=hlm, view_types=hlm_view_types)
196196
197- # Compute layers & views
198- with console_block ("Compute views" ):
199- with log_block ("create_layers_and_views_tasks" ):
200- hlms = {}
201- main_views = {}
202- main_indexes = {}
203- views = {}
204- view_keys = set ()
205- for layer , layer_condition in self .preset .layer_defs .items ():
206- layer_hlm = hlm .copy ()
207- if layer_condition :
208- layer_hlm = hlm .query (layer_condition )
209- layer_main_view = self .compute_main_view (
210- layer = layer ,
211- hlm = layer_hlm ,
212- view_types = proc_view_types ,
213- )
214- layer_main_index = layer_main_view .index .to_frame ().reset_index (drop = True )
215- layer_views = self .compute_views (
216- layer = layer ,
217- main_view = layer_main_view ,
218- view_types = proc_view_types ,
219- )
220- if logical_view_types :
221- layer_logical_views = self .compute_logical_views (
222- layer = layer ,
223- main_view = layer_main_view ,
224- views = layer_views ,
225- view_types = proc_view_types ,
226- )
227- layer_views .update (layer_logical_views )
228- hlms [layer ] = layer_hlm
229- main_views [layer ] = layer_main_view
230- main_indexes [layer ] = layer_main_index
231- views [layer ] = layer_views
232- view_keys .update (layer_views .keys ())
233-
234- with log_block ("compute_views_and_raw_stats" ):
235- (views , raw_stats ) = compute (views , raw_stats )
236-
237- # Restore checkpointed flat views if available
238- checkpointed_flat_views = {}
239- if self .checkpoint :
240- with log_block ("restore_flat_view_checkpoints" ):
241- checkpointed_flat_views .update (self .restore_flat_views (view_keys = list (view_keys )))
242-
243- # Process views to create flat views
244- with console_block ("Process views" ):
245- flat_views = {}
246- for layer in views :
247- for view_key in views [layer ]:
248- if view_key in checkpointed_flat_views :
249- flat_views [view_key ] = checkpointed_flat_views [view_key ]
250- continue
251- with log_block ("merge_flat_view" , view_key = view_key ):
252- view = views [layer ][view_key ].copy ()
253- view .columns = view .columns .map (lambda col : layer .lower () + "_" + col )
254- if view_key in flat_views :
255- flat_views [view_key ] = flat_views [view_key ].merge (
256- view ,
257- how = "outer" ,
258- left_index = True ,
259- right_index = True ,
260- )
261- else :
262- flat_views [view_key ] = view
263- try :
264- df = flat_views [view_key ]
265- mem_bytes = int (df .memory_usage (deep = True ).sum ()) if hasattr (df , 'memory_usage' ) else - 1
266- logger .debug (
267- "Flat view created" ,
268- view_key = view_key ,
269- shape = getattr (df , 'shape' , None ),
270- mem_bytes = mem_bytes ,
271- )
272- except Exception :
273- pass
274-
275- # Compute metric boundaries for flat views
276- with log_block ("process_flat_views+metric_boundaries" ):
277- for view_key in flat_views :
278- if view_key in checkpointed_flat_views :
279- continue
280- view_type = view_key [- 1 ]
281- top_layer = list (self .preset .layer_defs )[0 ]
282- time_suffix = "time_sum" if self .is_view_process_based (view_key ) else "time_max"
283- with log_block ("calculate_metric_boundary" , view_key = view_key ):
284- time_boundary = flat_views [view_key ][f"{ top_layer } _{ time_suffix } " ].sum ()
285- metric_boundaries [view_type ] = metric_boundaries .get (view_type , {})
286- for layer in self .preset .layer_defs :
287- metric_boundaries [view_type ][f"{ layer } _{ time_suffix } " ] = time_boundary
288- with log_block ("process_flat_view" , view_key = view_key ):
289- # Process flat views to compute metrics and scores
290- flat_views [view_key ] = self ._process_flat_view (
291- flat_view = flat_views [view_key ],
292- view_key = view_key ,
293- metric_boundaries = metric_boundaries ,
294- )
295-
296- # Checkpoint flat views if enabled
297- if self .checkpoint :
298- with log_block ("write_flat_view_checkpoints" ):
299- self .checkpoint_tasks .extend (self .store_flat_views (flat_views = flat_views ))
300-
301- # Wait for all checkpoint tasks
302- if self .checkpoint :
303- with log_block ("wait_for_checkpoints" ):
304- wait (self .checkpoint_tasks )
305-
306- return AnalyzerResultType (
307- _hlms = hlms ,
308- _main_views = main_views ,
309- _metric_boundaries = metric_boundaries ,
310- _traces = traces ,
311- checkpoint_dir = self .checkpoint_dir ,
312- flat_views = flat_views ,
313- layers = self .layers ,
197+ # Analyze HLM
198+ result = self ._analyze_hlm (
199+ hlm = hlm ,
200+ logical_view_types = logical_view_types ,
201+ metric_boundaries = metric_boundaries ,
202+ proc_view_types = proc_view_types ,
314203 raw_stats = raw_stats ,
315- view_types = view_types ,
316- views = views ,
317204 )
318205
206+ # Attach correct traces & view types
207+ result ._traces = traces
208+ result .view_types = view_types
209+
210+ # Return result
211+ return result
212+
319213 def read_stats (self , traces : dd .DataFrame ) -> RawStats :
320214 """Computes and restores raw statistics from the trace data.
321215
@@ -626,6 +520,17 @@ def get_job_time(self, traces: dd.DataFrame) -> float:
626520 """
627521 return traces [COL_TIME_END ].max () - traces [COL_TIME_START ].min ()
628522
523+ def ensure_proc_view_type (self , view_types : List [ViewType ]) -> List [ViewType ]:
524+ """Ensures that COL_PROC_NAME is always included in the list of view types.
525+
526+ Args:
527+ view_types: A list of view types to be used for analysis.
528+
529+ Returns:
530+ A sorted list of view types that always includes COL_PROC_NAME.
531+ """
532+ return list (sorted (set (view_types ).union ({COL_PROC_NAME })))
533+
629534 def get_stats_checkpoint_name (self ):
630535 return self .get_checkpoint_name (CHECKPOINT_RAW_STATS )
631536
@@ -887,6 +792,166 @@ def _iter_permutations(r: int):
887792
888793 return it .chain .from_iterable (map (_iter_permutations , range (len (view_types ))))
889794
795+ def _analyze_hlm (
796+ self ,
797+ hlm : Optional [dd .DataFrame ],
798+ proc_view_types : List [ViewType ],
799+ metric_boundaries : ViewMetricBoundaries ,
800+ raw_stats : RawStats ,
801+ logical_view_types : bool ,
802+ layer_main_views : Optional [Dict [Layer , dd .DataFrame ]] = None ,
803+ ) -> AnalyzerResultType :
804+ """
805+ Analyze the high-level metrics (HLM) and compute views for each layer.
806+
807+ This method computes the main views and additional views for each layer, either from the provided
808+ high-level metrics DataFrame (`hlm`) or from precomputed main views (`layer_main_views`). At least
809+ one of `hlm` or `layer_main_views` must be provided. If `layer_main_views` is given and contains
810+ a main view for a layer, it will be used; otherwise, the main view will be computed from `hlm`.
811+
812+ Args:
813+ hlm (dd.DataFrame): The high-level metrics Dask DataFrame. Required unless all main views are provided
814+ in `layer_main_views`.
815+ proc_view_types (List[ViewType]): List of view types to process for each layer.
816+ metric_boundaries (ViewMetricBoundaries): Boundaries for metrics used in view computation.
817+ raw_stats (RawStats): Raw statistics to be computed alongside the views.
818+ logical_view_types (bool): Whether to compute logical views in addition to main views.
819+ layer_main_views (Optional[Dict[Layer, dd.DataFrame]]): Optional dictionary mapping each layer to its
820+ precomputed main view. If not provided, main views will be computed from `hlm`.
821+
822+ Returns:
823+ AnalyzerResultType: The result of the analysis, including computed views and statistics.
824+
825+ Raises:
826+ ValueError: If neither `hlm` nor `layer_main_views` is provided for a required layer.
827+ """
828+ # Compute layers & views
829+ with console_block ("Compute views" ):
830+ with log_block ("create_layers_and_views_tasks" ):
831+ hlms = {}
832+ main_views = {}
833+ main_indexes = {}
834+ views = {}
835+ view_keys = set ()
836+ for layer , layer_condition in self .preset .layer_defs .items ():
837+ layer_hlm = None
838+ if layer_main_views is not None and layer in layer_main_views :
839+ layer_main_view = layer_main_views [layer ]
840+ else :
841+ if hlm is None :
842+ raise ValueError ("hlm must be provided when layer_main_views is not supplied" )
843+ layer_hlm = hlm .copy ()
844+ if layer_condition :
845+ layer_hlm = hlm .query (layer_condition )
846+ layer_main_view = self .compute_main_view (
847+ layer = layer ,
848+ hlm = layer_hlm ,
849+ view_types = proc_view_types ,
850+ )
851+ layer_main_index = layer_main_view .index .to_frame ().reset_index (drop = True )
852+ layer_views = self .compute_views (
853+ layer = layer ,
854+ main_view = layer_main_view ,
855+ view_types = proc_view_types ,
856+ )
857+ if logical_view_types :
858+ layer_logical_views = self .compute_logical_views (
859+ layer = layer ,
860+ main_view = layer_main_view ,
861+ views = layer_views ,
862+ view_types = proc_view_types ,
863+ )
864+ layer_views .update (layer_logical_views )
865+ hlms [layer ] = layer_hlm
866+ main_views [layer ] = layer_main_view
867+ main_indexes [layer ] = layer_main_index
868+ views [layer ] = layer_views
869+ view_keys .update (layer_views .keys ())
870+
871+ with log_block ("compute_views_and_raw_stats" ):
872+ (views , raw_stats ) = compute (views , raw_stats )
873+
874+ # Restore checkpointed flat views if available
875+ checkpointed_flat_views = {}
876+ if self .checkpoint :
877+ with log_block ("restore_flat_view_checkpoints" ):
878+ checkpointed_flat_views .update (self .restore_flat_views (view_keys = list (view_keys )))
879+
880+ # Process views to create flat views
881+ with console_block ("Process views" ):
882+ flat_views = {}
883+ for layer in views :
884+ for view_key in views [layer ]:
885+ if view_key in checkpointed_flat_views :
886+ flat_views [view_key ] = checkpointed_flat_views [view_key ]
887+ continue
888+ with log_block ("merge_flat_view" , view_key = view_key ):
889+ view = views [layer ][view_key ].copy ()
890+ view .columns = view .columns .map (lambda col : layer .lower () + "_" + col )
891+ if view_key in flat_views :
892+ flat_views [view_key ] = flat_views [view_key ].merge (
893+ view ,
894+ how = "outer" ,
895+ left_index = True ,
896+ right_index = True ,
897+ )
898+ else :
899+ flat_views [view_key ] = view
900+ try :
901+ df = flat_views [view_key ]
902+ mem_bytes = int (df .memory_usage (deep = True ).sum ()) if hasattr (df , 'memory_usage' ) else - 1
903+ logger .debug (
904+ "Flat view created" ,
905+ view_key = view_key ,
906+ shape = getattr (df , 'shape' , None ),
907+ mem_bytes = mem_bytes ,
908+ )
909+ except Exception as e :
910+ logger .exception ("Failed to log flat view details" , exc_info = e )
911+
912+ # Compute metric boundaries for flat views
913+ with log_block ("process_flat_views+metric_boundaries" ):
914+ for view_key in flat_views :
915+ if view_key in checkpointed_flat_views :
916+ continue
917+ view_type = view_key [- 1 ]
918+ top_layer = list (self .preset .layer_defs )[0 ]
919+ time_suffix = "time_sum" if self .is_view_process_based (view_key ) else "time_max"
920+ with log_block ("calculate_metric_boundary" , view_key = view_key ):
921+ time_boundary = flat_views [view_key ][f"{ top_layer } _{ time_suffix } " ].sum ()
922+ metric_boundaries .setdefault (view_type , {})
923+ for layer in self .preset .layer_defs :
924+ metric_boundaries [view_type ][f"{ layer } _{ time_suffix } " ] = time_boundary
925+ with log_block ("process_flat_view" , view_key = view_key ):
926+ # Process flat views to compute metrics and scores
927+ flat_views [view_key ] = self ._process_flat_view (
928+ flat_view = flat_views [view_key ],
929+ view_key = view_key ,
930+ metric_boundaries = metric_boundaries ,
931+ )
932+
933+ # Checkpoint flat views if enabled
934+ if self .checkpoint :
935+ with log_block ("write_flat_view_checkpoints" ):
936+ self .checkpoint_tasks .extend (self .store_flat_views (flat_views = flat_views ))
937+
938+ # Wait for all checkpoint tasks
939+ if self .checkpoint :
940+ with log_block ("wait_for_checkpoints" ):
941+ wait (self .checkpoint_tasks )
942+
943+ return AnalyzerResultType (
944+ _hlms = hlms ,
945+ _main_views = main_views ,
946+ _metric_boundaries = metric_boundaries ,
947+ checkpoint_dir = self .checkpoint_dir ,
948+ flat_views = flat_views ,
949+ layers = self .layers ,
950+ raw_stats = raw_stats ,
951+ view_types = proc_view_types ,
952+ views = views ,
953+ )
954+
890955 def _compute_high_level_metrics (
891956 self ,
892957 traces : dd .DataFrame ,
0 commit comments