diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 0cfc4170..8ca73a5a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -42,10 +42,9 @@ import lsst.ctrl.mpexec from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor from lsst.daf.butler import Butler, CollectionType, DatasetType, Timespan -from lsst.daf.butler.registry import MissingDatasetTypeError +from lsst.daf.butler import DataIdValueError, MissingDatasetTypeError import lsst.dax.apdb import lsst.geom -from lsst.meas.algorithms.htmIndexer import HtmIndexer import lsst.obs.base import lsst.pipe.base import lsst.analysis.tools @@ -624,21 +623,16 @@ def _export_refcats(self, region): refcats : iterable [`DatasetRef`] The refcats to be exported, after any filtering. """ - center = lsst.geom.SpherePoint(region.getCentroid()) - radius = max([center.separation(lsst.geom.SpherePoint(vertex)) for vertex in region.getVertices()]) - indexer = HtmIndexer(depth=7) - shard_ids, _ = indexer.getShardIds(center, radius) - htm_where = f"htm7 in ({','.join(str(x) for x in shard_ids)})" # Get shards from all refcats that overlap this region. possible_refcats = _get_refcat_types(self.central_butler) - _log.debug("Searching for refcats of types %s in %s...", - {t.name for t in possible_refcats}, shard_ids) + _log.debug("Searching for refcats of types %s.", {t.name for t in possible_refcats}) refcats = set(_filter_datasets( self.central_butler, self.butler, possible_refcats, collections=self.instrument.makeRefCatCollectionName(), - where=htm_where, - findFirst=True, + where="htm7.region OVERLAPS search_region", + bind={"search_region": region}, + find_first=True, all_callback=self._mark_dataset_usage, )) if refcats: @@ -668,16 +662,11 @@ def _export_skymap_and_templates(self, region, filter): ["skyMap"], skymap=self.skymap_name, collections=self._collection_skymap, - findFirst=True, + find_first=True, all_callback=self._mark_dataset_usage, )) _log.debug("Found %d new skymap datasets.", len(skymaps)) - # htm7 is too coarse and many more patches than necessary would be selected. - # But searching Butler with htm higher level does not work. - # TODO: This will be replaced by the new spatial query feature in Butler. - template_where = " OR ".join([f"htm7 in ({range[0]}..{range[1]})" - for range in lsst.sphgeom.HtmPixelization(7).interior(region).ranges()]) try: _log.debug("Searching for templates.") templates = set(_filter_datasets( @@ -687,8 +676,9 @@ def _export_skymap_and_templates(self, region, filter): instrument=self.instrument.getName(), skymap=self.skymap_name, physical_filter=filter, - where=template_where, - findFirst=True, + where="patch.region OVERLAPS search_region", + bind={"search_region": region}, + find_first=True, all_callback=self._mark_dataset_usage, )) except _MissingDatasetError as err: @@ -719,8 +709,6 @@ def _export_calibs(self, detector_id, filter): # Some calibs have an exposure ID (of the source dataset?), but these can't be used in AP. type_names = {t.name for t in self.central_butler.registry.queryDatasetTypes() if t.isCalibration() and "exposure" not in t.dimensions} - # TODO: we can't use findFirst=True yet because findFirst query - # in CALIBRATION-type collection is not supported currently. # For now, filter down to the dataset types that exist in the specific calib collection. # TODO: A new query API after DM-45873 may replace or improve this usage. # TODO: DM-40245 to identify the datasets. @@ -734,6 +722,7 @@ def _export_calibs(self, detector_id, filter): instrument=self.instrument.getName(), detector=detector_id, physical_filter=filter, + find_first=True, calib_date=calib_date, all_callback=self._mark_dataset_usage, )) @@ -761,7 +750,7 @@ def _export_ml_models(self): self.central_butler, self.butler, ["pretrainedModelPackage"], collections=self._collection_ml_model, - findFirst=True, + find_first=True, all_callback=self._mark_dataset_usage, )) except _MissingDatasetError as err: @@ -1750,8 +1739,7 @@ def _filter_datasets(src_repo: Butler, """Identify datasets in a source repository, filtering out those already present in a destination. - Unlike Butler or database queries, this method raises if nothing in the - source repository matches the query criteria. + This method raises if nothing in the source repository matches the query criteria. Parameters ---------- @@ -1770,7 +1758,7 @@ def _filter_datasets(src_repo: Butler, This callable is not called if the query returns no results. *args, **kwargs Parameters for describing the dataset query. They have the same - meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`. + meanings as the parameters of `lsst.daf.butler.query_datasets` The query must be valid for both ``src_repo`` and ``dest_repo``. Returns @@ -1798,8 +1786,12 @@ def _filter_datasets(src_repo: Butler, known_datasets = set() for dataset_type in dataset_types: try: - subset = set(dest_repo.registry.queryDatasets(dataset_type, *args, **kwargs)) - except lsst.daf.butler.registry.DataIdValueError as e: + # Okay to have empty results. + subset = set(dest_repo.query_datasets(dataset_type, explain=False, *args, **kwargs)) + except MissingDatasetTypeError as e: + _log.debug("Pre-export query with args '%s' failed with %s", formatted_args, e) + # If dataset type never registered locally, then *any* such datasets are missing. + except DataIdValueError as e: _log.debug("Pre-export query with args '%s' failed with %s", formatted_args, e) # If dimensions are invalid, then *any* such datasets are missing. else: @@ -1814,7 +1806,8 @@ def _filter_datasets(src_repo: Butler, level=logging.DEBUG): src_datasets = set() for dataset_type in dataset_types: - src_datasets |= set(src_repo.registry.queryDatasets(dataset_type, *args, **kwargs).expanded()) + # explain=False because empty query result is ok here and we don't need it to raise an error. + src_datasets |= set(src_repo.query_datasets(dataset_type, explain=False, *args, **kwargs)) # In many contexts, src_datasets is too large to print. _log_trace3.debug("Source datasets: %s", src_datasets) if calib_date: @@ -1826,6 +1819,8 @@ def _filter_datasets(src_repo: Butler, )) _log_trace.debug("Sources filtered to %s: %s", calib_date.iso, src_datasets) if not src_datasets: + # The downstream method decides what to do with empty results. + # DM-40245 and DM-46178 may change this. raise _MissingDatasetError( "Source repo query with args '{}' found no matches.".format(formatted_args)) if all_callback: diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 071605f8..bd43e4a3 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -910,17 +910,17 @@ def test_filter_datasets(self): # Case where src is empty now covered in test_filter_datasets_nosrc. for src, existing in itertools.product(combinations, [set()] + combinations): diff = src - existing - src_butler = unittest.mock.Mock( - **{"registry.queryDatasets.return_value.expanded.return_value": src}) - existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing}) + src_butler = unittest.mock.Mock(**{"query_datasets.return_value": src}) + existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing}) with self.subTest(src=sorted(ref.dataId["detector"] for ref in src), existing=sorted(ref.dataId["detector"] for ref in existing)): result = set(_filter_datasets(src_butler, existing_butler, ["bias"], instrument="LSSTComCamSim")) - src_butler.registry.queryDatasets.assert_called_once_with("bias", instrument="LSSTComCamSim") - existing_butler.registry.queryDatasets.assert_called_once_with("bias", - instrument="LSSTComCamSim") + src_butler.query_datasets.assert_called_once_with( + "bias", instrument="LSSTComCamSim", explain=False) + existing_butler.query_datasets.assert_called_once_with( + "bias", instrument="LSSTComCamSim", explain=False) self.assertEqual(result, diff) def test_filter_datasets_nodim(self): @@ -933,15 +933,15 @@ def test_filter_datasets_nodim(self): data1 = self._make_expanded_ref(registry, "skyMap", {"skymap": skymap_name}, "dummy") src_butler = unittest.mock.Mock( - **{"registry.queryDatasets.return_value.expanded.return_value": {data1}}) + **{"query_datasets.return_value": {data1}}) existing_butler = unittest.mock.Mock( - **{"registry.queryDatasets.side_effect": + **{"query_datasets.side_effect": lsst.daf.butler.registry.DataIdValueError( f"Unknown values specified for governor dimension skymap: {{{skymap_name}}}") }) result = set(_filter_datasets(src_butler, existing_butler, ["skyMap"], ..., skymap="mymap")) - src_butler.registry.queryDatasets.assert_called_once_with("skyMap", ..., skymap="mymap") + src_butler.query_datasets.assert_called_once_with("skyMap", ..., skymap="mymap", explain=False) self.assertEqual(result, {data1}) def test_filter_datasets_nosrc(self): @@ -955,9 +955,9 @@ def test_filter_datasets_nosrc(self): "dummy") src_butler = unittest.mock.Mock( - **{"registry.queryDatasets.return_value.expanded.return_value": set()}) + **{"query_datasets.return_value": set()}) for existing in [set(), {data1}]: - existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing}) + existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing}) with self.subTest(existing=sorted(ref.dataId["detector"] for ref in existing)): with self.assertRaises(_MissingDatasetError): @@ -982,8 +982,8 @@ def test_function(expected, incoming): # Case where src is empty covered below. for src, existing in itertools.product(combinations, [set()] + combinations): src_butler = unittest.mock.Mock( - **{"registry.queryDatasets.return_value.expanded.return_value": src}) - existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing}) + **{"query_datasets.return_value": src}) + existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing}) with self.subTest(src=sorted(ref.dataId["detector"] for ref in src), existing=sorted(ref.dataId["detector"] for ref in existing)): @@ -997,8 +997,8 @@ def non_callable(_): for existing in [set()] + combinations: src_butler = unittest.mock.Mock( - **{"registry.queryDatasets.return_value.expanded.return_value": set()}) - existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing}) + **{"query_datasets.return_value": set()}) + existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing}) with self.subTest(existing=sorted(ref.dataId["detector"] for ref in existing)): with self.assertRaises(_MissingDatasetError):