Skip to content

Commit

Permalink
Use the new Butler query system in _filter_datasets for region query
Browse files Browse the repository at this point in the history
The new Butler query systems supports spatial-constraint query
via lsst.sphgeom.Region directly. With this change, we use it
in template and refcat search. This needs stack w_2024_38 or newer.

make_export.py uses _filter_datasets so it needs to adjust to the
new underlying API too.
  • Loading branch information
hsinfang committed Oct 16, 2024
1 parent f38bcf6 commit a31efe6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 43 deletions.
51 changes: 23 additions & 28 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
from lsst.daf.butler import Butler, CollectionType, DatasetType, Timespan
from lsst.daf.butler.registry import MissingDatasetTypeError
from lsst.daf.butler import DataIdValueError, MissingDatasetTypeError
import lsst.dax.apdb
import lsst.geom
from lsst.meas.algorithms.htmIndexer import HtmIndexer
import lsst.obs.base
import lsst.pipe.base
import lsst.analysis.tools
Expand Down Expand Up @@ -624,21 +623,16 @@ def _export_refcats(self, region):
refcats : iterable [`DatasetRef`]
The refcats to be exported, after any filtering.
"""
center = lsst.geom.SpherePoint(region.getCentroid())
radius = max([center.separation(lsst.geom.SpherePoint(vertex)) for vertex in region.getVertices()])
indexer = HtmIndexer(depth=7)
shard_ids, _ = indexer.getShardIds(center, radius)
htm_where = f"htm7 in ({','.join(str(x) for x in shard_ids)})"
# Get shards from all refcats that overlap this region.
possible_refcats = _get_refcat_types(self.central_butler)
_log.debug("Searching for refcats of types %s in %s...",
{t.name for t in possible_refcats}, shard_ids)
_log.debug("Searching for refcats of types %s.", {t.name for t in possible_refcats})
refcats = set(_filter_datasets(
self.central_butler, self.butler,
possible_refcats,
collections=self.instrument.makeRefCatCollectionName(),
where=htm_where,
findFirst=True,
where="htm7.region OVERLAPS search_region",
bind={"search_region": region},
find_first=True,
all_callback=self._mark_dataset_usage,
))
if refcats:
Expand Down Expand Up @@ -668,16 +662,11 @@ def _export_skymap_and_templates(self, region, filter):
["skyMap"],
skymap=self.skymap_name,
collections=self._collection_skymap,
findFirst=True,
find_first=True,
all_callback=self._mark_dataset_usage,
))
_log.debug("Found %d new skymap datasets.", len(skymaps))

# htm7 is too coarse and many more patches than necessary would be selected.
# But searching Butler with htm higher level does not work.
# TODO: This will be replaced by the new spatial query feature in Butler.
template_where = " OR ".join([f"htm7 in ({range[0]}..{range[1]})"
for range in lsst.sphgeom.HtmPixelization(7).interior(region).ranges()])
try:
_log.debug("Searching for templates.")
templates = set(_filter_datasets(
Expand All @@ -687,8 +676,9 @@ def _export_skymap_and_templates(self, region, filter):
instrument=self.instrument.getName(),
skymap=self.skymap_name,
physical_filter=filter,
where=template_where,
findFirst=True,
where="patch.region OVERLAPS search_region",
bind={"search_region": region},
find_first=True,
all_callback=self._mark_dataset_usage,
))
except _MissingDatasetError as err:
Expand Down Expand Up @@ -719,8 +709,6 @@ def _export_calibs(self, detector_id, filter):
# Some calibs have an exposure ID (of the source dataset?), but these can't be used in AP.
type_names = {t.name for t in self.central_butler.registry.queryDatasetTypes()
if t.isCalibration() and "exposure" not in t.dimensions}
# TODO: we can't use findFirst=True yet because findFirst query
# in CALIBRATION-type collection is not supported currently.
# For now, filter down to the dataset types that exist in the specific calib collection.
# TODO: A new query API after DM-45873 may replace or improve this usage.
# TODO: DM-40245 to identify the datasets.
Expand All @@ -734,6 +722,7 @@ def _export_calibs(self, detector_id, filter):
instrument=self.instrument.getName(),
detector=detector_id,
physical_filter=filter,
find_first=True,
calib_date=calib_date,
all_callback=self._mark_dataset_usage,
))
Expand Down Expand Up @@ -761,7 +750,7 @@ def _export_ml_models(self):
self.central_butler, self.butler,
["pretrainedModelPackage"],
collections=self._collection_ml_model,
findFirst=True,
find_first=True,
all_callback=self._mark_dataset_usage,
))
except _MissingDatasetError as err:
Expand Down Expand Up @@ -1750,8 +1739,7 @@ def _filter_datasets(src_repo: Butler,
"""Identify datasets in a source repository, filtering out those already
present in a destination.
Unlike Butler or database queries, this method raises if nothing in the
source repository matches the query criteria.
This method raises if nothing in the source repository matches the query criteria.
Parameters
----------
Expand All @@ -1770,7 +1758,7 @@ def _filter_datasets(src_repo: Butler,
This callable is not called if the query returns no results.
*args, **kwargs
Parameters for describing the dataset query. They have the same
meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`.
meanings as the parameters of `lsst.daf.butler.query_datasets`
The query must be valid for both ``src_repo`` and ``dest_repo``.
Returns
Expand Down Expand Up @@ -1798,8 +1786,12 @@ def _filter_datasets(src_repo: Butler,
known_datasets = set()
for dataset_type in dataset_types:
try:
subset = set(dest_repo.registry.queryDatasets(dataset_type, *args, **kwargs))
except lsst.daf.butler.registry.DataIdValueError as e:
# Okay to have empty results.
subset = set(dest_repo.query_datasets(dataset_type, explain=False, *args, **kwargs))
except MissingDatasetTypeError as e:
_log.debug("Pre-export query with args '%s' failed with %s", formatted_args, e)
# If dataset type never registered locally, then *any* such datasets are missing.
except DataIdValueError as e:
_log.debug("Pre-export query with args '%s' failed with %s", formatted_args, e)
# If dimensions are invalid, then *any* such datasets are missing.
else:
Expand All @@ -1814,7 +1806,8 @@ def _filter_datasets(src_repo: Butler,
level=logging.DEBUG):
src_datasets = set()
for dataset_type in dataset_types:
src_datasets |= set(src_repo.registry.queryDatasets(dataset_type, *args, **kwargs).expanded())
# explain=False because empty query result is ok here and we don't need it to raise an error.
src_datasets |= set(src_repo.query_datasets(dataset_type, explain=False, *args, **kwargs))
# In many contexts, src_datasets is too large to print.
_log_trace3.debug("Source datasets: %s", src_datasets)
if calib_date:
Expand All @@ -1826,6 +1819,8 @@ def _filter_datasets(src_repo: Butler,
))
_log_trace.debug("Sources filtered to %s: %s", calib_date.iso, src_datasets)
if not src_datasets:
# The downstream method decides what to do with empty results.
# DM-40245 and DM-46178 may change this.
raise _MissingDatasetError(
"Source repo query with args '{}' found no matches.".format(formatted_args))
if all_callback:
Expand Down
30 changes: 15 additions & 15 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,17 +910,17 @@ def test_filter_datasets(self):
# Case where src is empty now covered in test_filter_datasets_nosrc.
for src, existing in itertools.product(combinations, [set()] + combinations):
diff = src - existing
src_butler = unittest.mock.Mock(
**{"registry.queryDatasets.return_value.expanded.return_value": src})
existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing})
src_butler = unittest.mock.Mock(**{"query_datasets.return_value": src})
existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing})

with self.subTest(src=sorted(ref.dataId["detector"] for ref in src),
existing=sorted(ref.dataId["detector"] for ref in existing)):
result = set(_filter_datasets(src_butler, existing_butler,
["bias"], instrument="LSSTComCamSim"))
src_butler.registry.queryDatasets.assert_called_once_with("bias", instrument="LSSTComCamSim")
existing_butler.registry.queryDatasets.assert_called_once_with("bias",
instrument="LSSTComCamSim")
src_butler.query_datasets.assert_called_once_with(
"bias", instrument="LSSTComCamSim", explain=False)
existing_butler.query_datasets.assert_called_once_with(
"bias", instrument="LSSTComCamSim", explain=False)
self.assertEqual(result, diff)

def test_filter_datasets_nodim(self):
Expand All @@ -933,15 +933,15 @@ def test_filter_datasets_nodim(self):
data1 = self._make_expanded_ref(registry, "skyMap", {"skymap": skymap_name}, "dummy")

src_butler = unittest.mock.Mock(
**{"registry.queryDatasets.return_value.expanded.return_value": {data1}})
**{"query_datasets.return_value": {data1}})
existing_butler = unittest.mock.Mock(
**{"registry.queryDatasets.side_effect":
**{"query_datasets.side_effect":
lsst.daf.butler.registry.DataIdValueError(
f"Unknown values specified for governor dimension skymap: {{{skymap_name}}}")
})

result = set(_filter_datasets(src_butler, existing_butler, ["skyMap"], ..., skymap="mymap"))
src_butler.registry.queryDatasets.assert_called_once_with("skyMap", ..., skymap="mymap")
src_butler.query_datasets.assert_called_once_with("skyMap", ..., skymap="mymap", explain=False)
self.assertEqual(result, {data1})

def test_filter_datasets_nosrc(self):
Expand All @@ -955,9 +955,9 @@ def test_filter_datasets_nosrc(self):
"dummy")

src_butler = unittest.mock.Mock(
**{"registry.queryDatasets.return_value.expanded.return_value": set()})
**{"query_datasets.return_value": set()})
for existing in [set(), {data1}]:
existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing})
existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing})

with self.subTest(existing=sorted(ref.dataId["detector"] for ref in existing)):
with self.assertRaises(_MissingDatasetError):
Expand All @@ -982,8 +982,8 @@ def test_function(expected, incoming):
# Case where src is empty covered below.
for src, existing in itertools.product(combinations, [set()] + combinations):
src_butler = unittest.mock.Mock(
**{"registry.queryDatasets.return_value.expanded.return_value": src})
existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing})
**{"query_datasets.return_value": src})
existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing})

with self.subTest(src=sorted(ref.dataId["detector"] for ref in src),
existing=sorted(ref.dataId["detector"] for ref in existing)):
Expand All @@ -997,8 +997,8 @@ def non_callable(_):

for existing in [set()] + combinations:
src_butler = unittest.mock.Mock(
**{"registry.queryDatasets.return_value.expanded.return_value": set()})
existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing})
**{"query_datasets.return_value": set()})
existing_butler = unittest.mock.Mock(**{"query_datasets.return_value": existing})

with self.subTest(existing=sorted(ref.dataId["detector"] for ref in existing)):
with self.assertRaises(_MissingDatasetError):
Expand Down

0 comments on commit a31efe6

Please sign in to comment.