Skip to content

Commit 615ffc8

Browse files
committed
Switch to {group, detector} for all provenance datasets.
Unfortunately we still need two dataset types, not just one, so the preprocessing provenance and main-pipeline provenance can coexist within the same RUN collection. But this is still a significant simplification.
1 parent a6a4467 commit 615ffc8

File tree

3 files changed

+42
-59
lines changed

3 files changed

+42
-59
lines changed

python/activator/exception.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt", "TimeoutInterrupt",
2424
"InvalidVisitError", "IgnorableVisit",
2525
"InvalidPipelineError", "NoGoodPipelinesError",
26-
"PipelinePreExecutionError", "PipelineExecutionError",
26+
"PipelinePreExecutionError", "PipelineExecutionError", "ProvenanceDimensionsError",
2727
]
2828

2929

@@ -144,3 +144,9 @@ class PipelineExecutionError(RuntimeError):
144144
145145
Usually chained to an internal exception.
146146
"""
147+
148+
149+
class ProvenanceDimensionsError(RuntimeError):
150+
"""Exception raised if the 'where' expression used to constrain a pipeline
151+
does not yield a viable data ID for provenance.
152+
"""

python/activator/middleware_interface.py

Lines changed: 33 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262
from shared.visit import FannedOutVisit
6363
from .caching import DatasetCache
6464
from .exception import GracefulShutdownInterrupt, TimeoutInterrupt, NonRetriableError, RetriableError, \
65-
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError
65+
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError, \
66+
ProvenanceDimensionsError
6667
from .timer import enforce_schema, time_this_to_bundle
6768

6869
_log = logging.getLogger("lsst." + __name__)
@@ -441,24 +442,18 @@ def _init_governor_datasets(self, timestamp, skymap):
441442
collections=self._collection_skymap)
442443

443444
def _init_provenance_dataset_types(self):
444-
self._group_provenance_dataset_type = DatasetType(
445-
"prompt_group_provenance",
445+
self._preprocessing_provenance_dataset_type = DatasetType(
446+
"prompt_preprocessing_provenance",
446447
self.butler.dimensions.conform(["group", "detector"]),
447448
"ProvenanceQuantumGraph",
448449
)
449-
self.butler.registry.registerDatasetType(self._group_provenance_dataset_type)
450-
self._visit_provenance_dataset_type = DatasetType(
451-
"prompt_visit_provenance",
452-
self.butler.dimensions.conform(["visit", "detector"]),
453-
"ProvenanceQuantumGraph",
454-
)
455-
self.butler.registry.registerDatasetType(self._visit_provenance_dataset_type)
456-
self._exposure_provenance_dataset_type = DatasetType(
457-
"prompt_exposure_provenance",
458-
self.butler.dimensions.conform(["exposure", "detector"]),
450+
self.butler.registry.registerDatasetType(self._preprocessing_provenance_dataset_type)
451+
self._main_provenance_dataset_type = DatasetType(
452+
"prompt_main_provenance",
453+
self.butler.dimensions.conform(["group", "detector"]),
459454
"ProvenanceQuantumGraph",
460455
)
461-
self.butler.registry.registerDatasetType(self._exposure_provenance_dataset_type)
456+
self.butler.registry.registerDatasetType(self._main_provenance_dataset_type)
462457

463458
def _define_dimensions(self):
464459
"""Define any dimensions that must be computed from this object's visit.
@@ -1395,9 +1390,10 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label, provenan
13951390
# Diagnostic logs are the responsibility of GraphBuilder.
13961391
_log.error(f"Empty quantum graph for {pipeline_file}; see previous logs for details.")
13971392
continue
1398-
provenance_ref = self._make_provenance_ref(provenance_dataset_type, qgraph, pipeline_file)
1399-
if provenance_ref is None:
1400-
# An error log is always emitted if None is returned.
1393+
try:
1394+
provenance_ref = self._make_provenance_ref(provenance_dataset_type, data_ids, output_run)
1395+
except ProvenanceDimensionsError:
1396+
_log.exception(f"Failed to determine data ID for provenance for {pipeline_file}.")
14011397
continue
14021398
# Past this point, partial execution creates datasets.
14031399
# Don't retry -- either fail (raise) or break.
@@ -1424,41 +1420,32 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label, provenan
14241420
else:
14251421
raise NoGoodPipelinesError(f"No {label} pipeline graph could be built.")
14261422

1427-
def _make_provenance_ref(self, dataset_type, qg, pipeline_file):
1423+
def _make_provenance_ref(self, dataset_type, where, output_run):
14281424
"""Make the provenance DatasetRef for a quantum graph.
14291425
14301426
Parameters
14311427
----------
14321428
dataset_type : `lsst.daf.butler.DatasetType`
14331429
Provenance dataset type for this pipeline.
1434-
qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
1435-
Quantum graph that predicts execution.
1436-
pipeline_file : `str`
1437-
Name of the pipeline (for log messages).
1430+
where : `str`
1431+
Butler query expression that can be related to a single
1432+
``{group, detector}`` data ID.
1433+
output_run : `str`
1434+
Output RUN collection.
14381435
14391436
Returns
14401437
-------
1441-
ref : `lsst.daf.butler.DatasetRef` or `None`
1442-
A reference to a to-be-written provenance dataset, or `None` if the
1443-
quantum graph and the provenance dataset type are incompatible.
1444-
Error logs are always emitted when `None` is returned.
1438+
ref : `lsst.daf.butler.DatasetRef`
1439+
A reference to a to-be-written provenance dataset.
14451440
"""
1446-
for task_node in qg.pipeline_graph.tasks.values():
1447-
if task_node.dimensions == dataset_type.dimensions:
1448-
data_ids = qg.quanta_by_task[task_node.label].keys()
1449-
if len(data_ids) == 1:
1450-
return DatasetRef(dataset_type, next(iter(data_ids)), run=qg.header.output_run)
1451-
else:
1452-
_log.error(
1453-
f"Task {task_node.label} in pipeline {pipeline_file} has multiple quanta for the "
1454-
f"dimensions {dataset_type.dimensions} of the provenance dataset."
1455-
)
1456-
return None
1457-
_log.error(
1458-
f"Pipeline {pipeline_file} has has no tasks with the "
1459-
f"dimensions {dataset_type.dimensions} of the provenance dataset."
1460-
)
1461-
return None
1441+
query_results = self.butler.query_data_ids(dataset_type.dimensions, where=where, explain=False)
1442+
try:
1443+
(data_id,) = query_results
1444+
except ValueError:
1445+
raise ProvenanceDimensionsError(
1446+
f"Expected exactly one data ID for {dataset_type}; got {query_results}."
1447+
) from None
1448+
return DatasetRef(dataset_type, data_id, run=output_run)
14621449

14631450
def _run_preprocessing(self) -> None:
14641451
"""Preprocess a visit ahead of incoming image(s).
@@ -1493,7 +1480,7 @@ def _run_preprocessing(self) -> None:
14931480
in_collections=[preload_run],
14941481
data_ids=where,
14951482
label="preprocessing",
1496-
provenance_dataset_type=self._group_provenance_dataset_type,
1483+
provenance_dataset_type=self._preprocessing_provenance_dataset_type,
14971484
)
14981485

14991486
def _check_permanent_changes(self, where: str) -> bool:
@@ -1578,17 +1565,12 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
15781565
# faked raw file and appropriate SSO data during prep (and then
15791566
# cleanup when ingesting the real data).
15801567
try:
1581-
visits_defined = self.define_visits.run({"instrument": self.instrument.getName(),
1582-
"exposure": exp} for exp in exposure_ids)
1568+
self.define_visits.run({"instrument": self.instrument.getName(),
1569+
"exposure": exp} for exp in exposure_ids)
15831570
except lsst.daf.butler.registry.DataIdError as e:
15841571
# TODO: a good place for a custom exception?
15851572
raise RuntimeError("No data to process.") from e
15861573

1587-
if visits_defined.n_visits:
1588-
provenance_dataset_type = self._visit_provenance_dataset_type
1589-
else:
1590-
provenance_dataset_type = self._exposure_provenance_dataset_type
1591-
15921574
# Inefficient, but most graph builders can't take equality constraints
15931575
where = (
15941576
f"instrument='{self.visit.instrument}' and detector={self.visit.detector}"
@@ -1603,7 +1585,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
16031585
in_collections=pre_runs + [preload_run],
16041586
data_ids=where,
16051587
label="main",
1606-
provenance_dataset_type=provenance_dataset_type,
1588+
provenance_dataset_type=self._main_provenance_dataset_type,
16071589
)
16081590
# Catch Exception just in case there's a surprise -- raising
16091591
# NonRetriableError on *all* irrevocable changes is important.

tests/test_middleware_interface.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -748,17 +748,12 @@ def _check_run_pipeline_fallback(self, callable, pipe_files, graphs, final_label
748748
# dataset types need to have the same dimensions.
749749
unittest.mock.patch.object(
750750
self.interface,
751-
"_group_provenance_dataset_type",
751+
"_preprocessing_provenance_dataset_type",
752752
test_provenance_dataset_type
753753
),
754754
unittest.mock.patch.object(
755755
self.interface,
756-
"_visit_provenance_dataset_type",
757-
test_provenance_dataset_type
758-
),
759-
unittest.mock.patch.object(
760-
self.interface,
761-
"_exposure_provenance_dataset_type",
756+
"_main_provenance_dataset_type",
762757
test_provenance_dataset_type
763758
),
764759
self.assertLogs(self.logger_name, level="INFO") as logs,

0 commit comments

Comments
 (0)