Skip to content

Commit 7cd6e38

Browse files
committed
Enable provenance recording.
1 parent 8229e4f commit 7cd6e38

File tree

3 files changed

+72
-5
lines changed

3 files changed

+72
-5
lines changed

python/activator/exception.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt", "TimeoutInterrupt",
2424
"InvalidVisitError", "IgnorableVisit",
2525
"InvalidPipelineError", "NoGoodPipelinesError",
26-
"PipelinePreExecutionError", "PipelineExecutionError",
26+
"PipelinePreExecutionError", "PipelineExecutionError", "ProvenanceDimensionsError",
2727
]
2828

2929

@@ -144,3 +144,9 @@ class PipelineExecutionError(RuntimeError):
144144
145145
Usually chained to an internal exception.
146146
"""
147+
148+
149+
class ProvenanceDimensionsError(RuntimeError):
150+
"""Exception raised if a viable data ID for provenance could not be
151+
defined.
152+
"""

python/activator/middleware_interface.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262
from shared.visit import FannedOutVisit
6363
from .caching import DatasetCache
6464
from .exception import GracefulShutdownInterrupt, TimeoutInterrupt, NonRetriableError, RetriableError, \
65-
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError
65+
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError, \
66+
ProvenanceDimensionsError
6667
from .timer import enforce_schema, time_this_to_bundle
6768

6869
_log = logging.getLogger("lsst." + __name__)
@@ -363,6 +364,7 @@ def __init__(self, read_butler: Butler, butler_writer: ButlerWriter, image_bucke
363364
self._define_dimensions()
364365
self._init_ingester()
365366
self._init_visit_definer()
367+
self._init_provenance_dataset_type()
366368

367369
# How much to pad the spatial region we will copy over.
368370
self.padding = padding*lsst.geom.arcseconds
@@ -439,6 +441,18 @@ def _init_governor_datasets(self, timestamp, skymap):
439441
self.skymap = self.read_central_butler.get("skyMap", skymap=self.skymap_name,
440442
collections=self._collection_skymap)
441443

444+
def _init_provenance_dataset_type(self):
445+
"""Register the dataset types used to store provenance information.
446+
447+
``self._init_local_butler`` must have already been run.
448+
"""
449+
self._provenance_dataset_type = DatasetType(
450+
"prompt_provenance",
451+
self.butler.dimensions.conform(["group", "detector"]),
452+
"ProvenanceQuantumGraph",
453+
)
454+
self.butler.registry.registerDatasetType(self._provenance_dataset_type)
455+
442456
def _define_dimensions(self):
443457
"""Define any dimensions that must be computed from this object's visit.
444458
@@ -1292,7 +1306,7 @@ def _get_graph_executor(self, butler, factory):
12921306
)
12931307
graph_executor = MPGraphExecutor(
12941308
# TODO: re-enable parallel execution once we can log as desired with CliLog or a successor
1295-
# (see issues linked from DM-42063)
1309+
# (see issues linked from DM-42063) AND once provenance is supported with multiprocessing.
12961310
num_proc=1, # Avoid spawning processes, because they bypass our logger
12971311
timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days.
12981312
quantum_executor=quantum_executor,
@@ -1370,6 +1384,11 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13701384
# Diagnostic logs are the responsibility of GraphBuilder.
13711385
_log.error(f"Empty quantum graph for {pipeline_file}; see previous logs for details.")
13721386
continue
1387+
try:
1388+
provenance_ref = self._make_provenance_ref(data_ids, output_run)
1389+
except ProvenanceDimensionsError:
1390+
_log.exception(f"Failed to determine data ID for provenance for {pipeline_file}.")
1391+
continue
13731392
# Past this point, partial execution creates datasets.
13741393
# Don't retry -- either fail (raise) or break.
13751394

@@ -1384,7 +1403,8 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13841403
_log, msg=f"executor.run_pipeline ({label})", level=logging.DEBUG):
13851404
executor.run_pipeline(
13861405
qgraph,
1387-
graph_executor=self._get_graph_executor(exec_butler, factory)
1406+
graph_executor=self._get_graph_executor(exec_butler, factory),
1407+
provenance_dataset_ref=provenance_ref,
13881408
)
13891409
_log.info(f"{label.capitalize()} pipeline successfully run.")
13901410
return output_run
@@ -1397,6 +1417,33 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
13971417
else:
13981418
raise NoGoodPipelinesError(f"No {label} pipeline graph could be built.")
13991419

1420+
def _make_provenance_ref(self, where, output_run):
1421+
"""Make the provenance DatasetRef for a quantum graph.
1422+
1423+
Parameters
1424+
----------
1425+
where : `str`
1426+
Butler query expression that can be related to a single
1427+
``{group, detector}`` data ID.
1428+
output_run : `str`
1429+
Output RUN collection.
1430+
1431+
Returns
1432+
-------
1433+
ref : `lsst.daf.butler.DatasetRef`
1434+
A reference to a to-be-written provenance dataset in ``output_run``.
1435+
"""
1436+
query_results = self.butler.query_data_ids(
1437+
self._provenance_dataset_type.dimensions, where=where, explain=False
1438+
)
1439+
try:
1440+
(data_id,) = query_results
1441+
except ValueError:
1442+
raise ProvenanceDimensionsError(
1443+
f"Expected exactly one data ID for {self._provenance_dataset_type}; got {query_results}."
1444+
) from None
1445+
return DatasetRef(self._provenance_dataset_type, data_id, run=output_run)
1446+
14001447
def _run_preprocessing(self) -> None:
14011448
"""Preprocess a visit ahead of incoming image(s).
14021449

tests/test_middleware_interface.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import lsst.afw.image
4242
import lsst.afw.table
4343
from lsst.dax.apdb import ApdbSql
44-
from lsst.daf.butler import Butler, CollectionType, DataCoordinate, DimensionUniverse, EmptyQueryResultError
44+
from lsst.daf.butler import (
45+
Butler, CollectionType, DataCoordinate, DatasetType, DimensionUniverse, EmptyQueryResultError
46+
)
4547
import lsst.daf.butler.tests as butler_tests
4648
from lsst.obs.base.formatters.fitsExposure import FitsImageFormatter
4749
from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData
@@ -717,6 +719,13 @@ def _check_run_pipeline_fallback(self, callable, pipe_files, graphs, final_label
717719
The description of the pipeline that should be run, given
718720
``pipe_files`` and ``graphs``.
719721
"""
722+
test_provenance_dataset_type = DatasetType(
723+
"test_provenance",
724+
# Mocked QGs do not have realistic dimensions, and provenance
725+
# dataset types need to have the same dimensions.
726+
self.interface.butler.dimensions.conform(["detector"]),
727+
"ProvenanceQuantumGraph"
728+
)
720729
with (
721730
unittest.mock.patch(
722731
"activator.middleware_interface.MiddlewareInterface.get_pre_pipeline_files",
@@ -736,6 +745,11 @@ def _check_run_pipeline_fallback(self, callable, pipe_files, graphs, final_label
736745
unittest.mock.patch(
737746
"activator.middleware_interface.SeparablePipelineExecutor.run_pipeline"
738747
) as mock_run,
748+
unittest.mock.patch.object(
749+
self.interface,
750+
"_provenance_dataset_type",
751+
test_provenance_dataset_type
752+
),
739753
self.assertLogs(self.logger_name, level="INFO") as logs,
740754
):
741755
callable()

0 commit comments

Comments
 (0)