@@ -363,6 +363,7 @@ def __init__(self, read_butler: Butler, butler_writer: ButlerWriter, image_bucke
363363 self ._define_dimensions ()
364364 self ._init_ingester ()
365365 self ._init_visit_definer ()
366+ self ._init_provenance_dataset_types ()
366367
367368 # How much to pad the spatial region we will copy over.
368369 self .padding = padding * lsst .geom .arcseconds
@@ -439,6 +440,26 @@ def _init_governor_datasets(self, timestamp, skymap):
439440 self .skymap = self .read_central_butler .get ("skyMap" , skymap = self .skymap_name ,
440441 collections = self ._collection_skymap )
441442
443+ def _init_provenance_dataset_types (self ):
444+ self ._group_provenance_dataset_type = DatasetType (
445+ "prompt_group_provenance" ,
446+ self .butler .dimensions .conform (["group" , "detector" ]),
447+ "ProvenanceQuantumGraph" ,
448+ )
449+ self .butler .registry .registerDatasetType (self ._group_provenance_dataset_type )
450+ self ._visit_provenance_dataset_type = DatasetType (
451+ "prompt_visit_provenance" ,
452+ self .butler .dimensions .conform (["visit" , "detector" ]),
453+ "ProvenanceQuantumGraph" ,
454+ )
455+ self .butler .registry .registerDatasetType (self ._visit_provenance_dataset_type )
456+ self ._exposure_provenance_dataset_type = DatasetType (
457+ "prompt_exposure_provenance" ,
458+ self .butler .dimensions .conform (["exposure" , "detector" ]),
459+ "ProvenanceQuantumGraph" ,
460+ )
461+ self .butler .registry .registerDatasetType (self ._exposure_provenance_dataset_type )
462+
442463 def _define_dimensions (self ):
443464 """Define any dimensions that must be computed from this object's visit.
444465
@@ -1292,14 +1313,14 @@ def _get_graph_executor(self, butler, factory):
12921313 )
12931314 graph_executor = MPGraphExecutor (
12941315 # TODO: re-enable parallel execution once we can log as desired with CliLog or a successor
1295- # (see issues linked from DM-42063)
1316+ # (see issues linked from DM-42063) AND once provenance is supported with multiprocessing.
12961317 num_proc = 1 , # Avoid spawning processes, because they bypass our logger
12971318 timeout = 2_592_000.0 , # In practice, timeout is never helpful; set to 30 days.
12981319 quantum_executor = quantum_executor ,
12991320 )
13001321 return graph_executor
13011322
1302- def _try_pipelines (self , pipelines , in_collections , data_ids , * , label ):
1323+ def _try_pipelines (self , pipelines , in_collections , data_ids , * , label , provenance_dataset_type ):
13031324 """Attempt to run pipelines from a prioritized list.
13041325
13051326 On success, exactly one of the pipelines is run, with outputs going to
@@ -1320,6 +1341,10 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13201341 label : `str`
13211342 A unique name to disambiguate this pipeline run for logging
13221343 purposes.
1344+ provenance_dataset_type : `lsst.daf.butler.DatasetRef`
1345+ The butler dataset used to store provenance information. Must have
1346+ dimensions that match the tasks of the pipeline and use the
1347+ "ProvenanceQuantumGraph" storage class.
13231348
13241349 Returns
13251350 -------
@@ -1370,6 +1395,10 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13701395 # Diagnostic logs are the responsibility of GraphBuilder.
13711396 _log .error (f"Empty quantum graph for { pipeline_file } ; see previous logs for details." )
13721397 continue
1398+ provenance_ref = self ._make_provenance_ref (provenance_dataset_type , qg , pipeline_file )
1399+ if provenance_ref is None :
1400+ # An error log is always emitted if None is returned.
1401+ continue
13731402 # Past this point, partial execution creates datasets.
13741403 # Don't retry -- either fail (raise) or break.
13751404
@@ -1381,7 +1410,8 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13811410 _log , msg = f"executor.run_pipeline ({ label } )" , level = logging .DEBUG ):
13821411 executor .run_pipeline (
13831412 qg ,
1384- graph_executor = self ._get_graph_executor (exec_butler , factory )
1413+ graph_executor = self ._get_graph_executor (exec_butler , factory ),
1414+ provenance_dataset_ref = provenance_ref ,
13851415 )
13861416 _log .info (f"{ label .capitalize ()} pipeline successfully run." )
13871417 return output_run
@@ -1394,6 +1424,42 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13941424 else :
13951425 raise NoGoodPipelinesError (f"No { label } pipeline graph could be built." )
13961426
1427+ def _make_provenance_ref (self , dataset_type , qg , pipeline_file ):
1428+ """Make the provenance DatasetRef for a quantum graph.
1429+
1430+ Parameters
1431+ ----------
1432+ dataset_type : `lsst.daf.butler.DatasetType`
1433+ Provenance dataset type for this pipeline.
1434+ qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
1435+ Quantum graph that predicts execution.
1436+ pipeline_file : `str`
1437+ Name of the pipeline (for log messages).
1438+
1439+ Returns
1440+ -------
1441+ ref : `lsst.daf.butler.DatasetRef` or `None`
1442+ A reference to a to-be-written provenance dataset, or `None` if the
1443+ quantum graph and the provenance dataset type are incompatible.
1444+ Error logs are always emitted when `None` is returned.
1445+ """
1446+ for task_node in qg .pipeline_graph .tasks .values ():
1447+ if task_node .dimensions == dataset_type .dimensions :
1448+ data_ids = qg .quanta_by_task [task_node .label ].keys ()
1449+ if len (data_ids ) == 1 :
1450+ return DatasetRef (dataset_type , next (iter (data_ids )), run = qg .header .output_run )
1451+ else :
1452+ _log .error (
1453+ f"Task { task_node .label } in pipeline { pipeline_file } has multiple quanta for the "
1454+ f"dimensions { dataset_type .dimensions } of the provenance dataset."
1455+ )
1456+ return None
1457+ _log .error (
1458+ f"Pipeline { pipeline_file } has has no tasks with the "
1459+ f"dimensions { dataset_type .dimensions } of the provenance dataset."
1460+ )
1461+ return None
1462+
13971463 def _run_preprocessing (self ) -> None :
13981464 """Preprocess a visit ahead of incoming image(s).
13991465
@@ -1427,6 +1493,7 @@ def _run_preprocessing(self) -> None:
14271493 in_collections = [preload_run ],
14281494 data_ids = where ,
14291495 label = "preprocessing" ,
1496+ provenance_dataset_type = self ._group_provenance_dataset_type ,
14301497 )
14311498
14321499 def _check_permanent_changes (self , where : str ) -> bool :
@@ -1511,12 +1578,17 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
15111578 # faked raw file and appropriate SSO data during prep (and then
15121579 # cleanup when ingesting the real data).
15131580 try :
1514- self .define_visits .run ({"instrument" : self .instrument .getName (),
1515- "exposure" : exp } for exp in exposure_ids )
1581+ visits_defined = self .define_visits .run ({"instrument" : self .instrument .getName (),
1582+ "exposure" : exp } for exp in exposure_ids )
15161583 except lsst .daf .butler .registry .DataIdError as e :
15171584 # TODO: a good place for a custom exception?
15181585 raise RuntimeError ("No data to process." ) from e
15191586
1587+ if visits_defined .n_visits :
1588+ provenance_dataset_type = self ._visit_provenance_dataset_type
1589+ else :
1590+ provenance_dataset_type = self ._exposure_provenance_dataset_type
1591+
15201592 # Inefficient, but most graph builders can't take equality constraints
15211593 where = (
15221594 f"instrument='{ self .visit .instrument } ' and detector={ self .visit .detector } "
@@ -1531,6 +1603,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
15311603 in_collections = pre_runs + [preload_run ],
15321604 data_ids = where ,
15331605 label = "main" ,
1606+ provenance_dataset_type = provenance_dataset_type ,
15341607 )
15351608 # Catch Exception just in case there's a surprise -- raising
15361609 # NonRetriableError on *all* irrevocable changes is important.
0 commit comments