Skip to content

Commit

Permalink
Add regular expression matching to check file extensions on ingest
Browse files Browse the repository at this point in the history
Fix collection name in example configuration file
Change info logger messages to debug
  • Loading branch information
srp3rd committed Nov 19, 2024
1 parent 3af4c3f commit 254cfbe
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
4 changes: 2 additions & 2 deletions etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ ingester:
name : MessageAttendant
repoDirectory : /tmp/repo/LATISS
collections:
- LATISS/raw/all
- LSSTCam/raw/all
cleanCollections:
- collection: LATISS/raw/all
- collection: LSSTCam/raw/all
filesOlderThan:
<<: *interval
seconds: 30
Expand Down
26 changes: 14 additions & 12 deletions python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,15 @@ def cleanCollection(self, collection, olderThan):
)
t = t - td

LOGGER.info("about to createButler()")
LOGGER.info("cleaning collections")
LOGGER.debug("about to createButler()")
butler = self.createButler()

LOGGER.info("about to refresh()")
LOGGER.debug("about to refresh()")
butler.registry.refresh()

# get all datasets in these collections
LOGGER.info("about to call queryDatasets")
LOGGER.debug("about to call queryDatasets")
all_datasets = set(
butler.registry.queryDatasets(
datasetType=...,
Expand All @@ -261,25 +262,25 @@ def cleanCollection(self, collection, olderThan):
bind={"ref_date": t},
)
)
LOGGER.info("done calling queryDatasets")
LOGGER.debug("done calling queryDatasets")

# get all TAGGED collections
LOGGER.info("about to call queryCollections")
LOGGER.debug("about to call queryCollections")
tagged_cols = list(butler.registry.queryCollections(collectionTypes=CollectionType.TAGGED))
LOGGER.info("done calling queryCollections")
LOGGER.debug("done calling queryCollections")

# Note: The code below is to get around an issue where passing
# an empty list as the collections argument to queryDatasets
# returns all datasets.
if tagged_cols:
# get all TAGGED datasets
LOGGER.info("about to run queryDatasets for TAGGED collections")
LOGGER.debug("about to run queryDatasets for TAGGED collections")
tagged_datasets = set(butler.registry.queryDatasets(datasetType=..., collections=tagged_cols))
LOGGER.info("done running queryDatasets for TAGGED collections; differencing datasets")
LOGGER.debug("done running queryDatasets for TAGGED collections; differencing datasets")

# get a set of datasets in all_datasets, but not in tagged_datasets
ref = all_datasets.difference(tagged_datasets)
LOGGER.info("done differencing datasets")
LOGGER.debug("done differencing datasets")
else:
# no TAGGED collections, so use all_datasets
ref = all_datasets
Expand All @@ -305,9 +306,10 @@ def cleanCollection(self, collection, olderThan):
except Exception as e:
LOGGER.warning("couldn't remove %s: %s", uri, e)

LOGGER.info("about to run pruneDatasets")
LOGGER.debug("about to run pruneDatasets")
butler.pruneDatasets(ref, purge=True, unstore=True)
LOGGER.info("done running pruneDatasets")
LOGGER.debug("done running pruneDatasets")
LOGGER.info("done cleaning collections")

def rawexposure_info(self, data):
"""Return a sparsely initialized dictionary
Expand Down Expand Up @@ -359,6 +361,6 @@ def definer_run(self, file_datasets):
refs = fds.refs
ids = [ref.dataId for ref in refs]
self.visit_definer.run(ids)
LOGGER.info("Defined visits for %s", ids)
LOGGER.debug("Defined visits for %s", ids)
except Exception as e:
LOGGER.exception(e)
18 changes: 17 additions & 1 deletion python/lsst/ctrl/oods/msgIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import asyncio
import logging
import os
import re

from confluent_kafka import KafkaError
from lsst.ctrl.oods.bucketMessage import BucketMessage
Expand Down Expand Up @@ -85,6 +87,9 @@ def __init__(self, mainConfig, csc=None):
self.tasks = []
self.dequeue_task = None

self.regex = re.compile(os.environ.get("DATASET_REGEXP", r".*\.(fits|fits.fz)$"))
LOGGER.info(f"Ingesting files matching regular expression {self.regex.pattern}")

def get_butler_clean_tasks(self):
"""Get a list of all butler run_task methods
Expand Down Expand Up @@ -140,6 +145,9 @@ def stop_tasks(self):
task.cancel()
self.tasks = []

def filter_by_regex(self, files):
return [s for s in files if self.regex.search(s)]

async def dequeue_and_ingest_files(self):
self.running = True
while self.running:
Expand All @@ -152,6 +160,8 @@ async def dequeue_and_ingest_files(self):
else:
raise Exception(f"KafkaError = {m.error().code()}")
rps = self._gather_all_resource_paths(m)
if rps is None:
continue
resources.extend(rps)
await self.ingest(resources)

Expand All @@ -162,6 +172,12 @@ def _gather_all_resource_paths(self, m):
# extract all urls within this message
msg = BucketMessage(m.value())

rp_list = [ResourcePath(url) for url in msg.extract_urls()]
names = [url for url in msg.extract_urls()]

name_list = self.filter_by_regex(names)
if len(name_list) == 0:
return None

rp_list = [ResourcePath(url) for url in name_list]

return rp_list

0 comments on commit 254cfbe

Please sign in to comment.