diff --git a/bin.src/atoods b/bin.src/atoods index efa2d8b..48e5c8d 100644 --- a/bin.src/atoods +++ b/bin.src/atoods @@ -31,6 +31,6 @@ lsstlog.usePythonLogging() LOGGER = logging.getLogger(__name__) F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" -logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S") +logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S") asyncio.run(AtOodsCsc.amain(index=None)) diff --git a/bin.src/ccoods b/bin.src/ccoods index 319d253..ad318c1 100644 --- a/bin.src/ccoods +++ b/bin.src/ccoods @@ -31,6 +31,6 @@ lsstlog.usePythonLogging() LOGGER = logging.getLogger(__name__) F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" -logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S") +logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S") asyncio.run(CcOodsCsc.amain(index=None)) diff --git a/bin.src/mtoods b/bin.src/mtoods index d708c94..07c3d3e 100644 --- a/bin.src/mtoods +++ b/bin.src/mtoods @@ -31,6 +31,6 @@ lsstlog.usePythonLogging() LOGGER = logging.getLogger(__name__) F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" -logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S") +logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S") asyncio.run(MtOodsCsc.amain(index=None)) diff --git a/bin.src/standalone b/bin.src/standalone new file mode 100644 index 0000000..0f4dc84 --- /dev/null +++ b/bin.src/standalone @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import logging +import os + +import lsst.log as lsstlog +import yaml +from lsst.ctrl.oods.msgIngester import MsgIngester + +LOGGER = logging.getLogger(__name__) +F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" +logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S") + + +class Standalone(object): + """Standalone class to run tests without CSC requirements.""" + + def __init__(self): + self.config = None + # import YAML file here + if "CTRL_OODS_CONFIG_FILE" in os.environ: + filename = os.environ["CTRL_OODS_CONFIG_FILE"] + LOGGER.info("using configuration %s", filename) + with open(filename, "r") as f: + self.config = yaml.safe_load(f) + else: + raise FileNotFoundError("CTRL_OODS_CONFIG_FILE is not set") + + self.task_list = None + + self.ingester_config = self.config["ingester"] + + async def send_imageInOODS(self, info): + """Send SAL message that the images has been ingested into the OODS + + Parameters + ---------- + info : `dict` + information about the image + """ + camera = info["CAMERA"] + obsid = info["OBSID"] + raft = "undef" + if "RAFT" in info: + raft = info["RAFT"] + sensor = "undef" + if "SENSOR" in info: + sensor = info["SENSOR"] + status_code = info["STATUS_CODE"] + description = info["DESCRIPTION"] + + s = f"would send camera={camera} obsid={obsid} raft={raft} sensor={sensor} " + s = s + f"statusCode={status_code}, description={description}" + LOGGER.info(s) + + async def start_services(self): + """Start all cleanup and archiving services""" + + # self added here, and by the time it's utilized by MsgIngester + # the CSC will be up and running + self.ingester = MsgIngester(self.config, self) + + self.task_list = self.ingester.run_tasks() + + async def stop_services(self): + """Stop all cleanup and archiving services""" + print("calling stop_tasks") + self.ingester.stop_tasks() + + async def done(self): + print("waiting...") + while True: + await asyncio.sleep(3600) + print("done!") + + async def main(self): + await self.start_services() + group = asyncio.gather(self.done()) + print("gathering") + await group + print("finished") + await self.stop_services() + print("complete") + + +if __name__ == "__main__": + alone = Standalone() + asyncio.run(alone.main()) diff --git a/etc/msg_oods.yaml b/etc/msg_oods.yaml new file mode 100644 index 0000000..d46aabb --- /dev/null +++ b/etc/msg_oods.yaml @@ -0,0 +1,30 @@ +defaultInterval: &interval + days: 0 + hours: 0 + minutes: 0 + seconds: 0 + +ingester: + kafka: + brokers: + - kafka:9092 + topics: + - atoods + group_id: ATOODS + max_messages: 10 + butlers: + - butler: + instrument: lsst.obs.lsst.Latiss + class: + import : lsst.ctrl.oods.messageAttendant + name : MessageAttendant + repoDirectory : /tmp/repo/LATISS + collections: + - LATISS/raw/all + scanInterval: + <<: *interval + seconds: 10 + filesOlderThan: + <<: *interval + seconds: 30 + batchSize: 20 diff --git a/etc/oods_example.yaml b/etc/oods_example.yaml index c0455ba..bbd455b 100644 --- a/etc/oods_example.yaml +++ b/etc/oods_example.yaml @@ -5,25 +5,29 @@ defaultInterval: &interval seconds: 0 ingester: - FILE_INGEST_REQUEST: CC_FILE_INGEST_REQUEST - CONSUME_QUEUE: cc_publish_to_oods - PUBLISH_QUEUE: oods_publish_to_cc - forwarderStagingDirectory: forwarder_staging butlers: - butler: + collections: + - LATISS/raw/all class: - import : lsst.ctrl.oods.gen2ButlerIngester - name : Gen2ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant stagingDirectory : staging repoDirectory : repo badFileDirectory: bad + scanInterval: + <<: *interval + minutes: 10 + filesOlderThan: + <<: *interval + days: 90 - butler: collections: - LATISS/raw/all instrument: lsst.obs.lsst.LsstComCam class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant stagingDirectory : staging2 repoDirectory : repo2 badFileDirectory: bad2 diff --git a/pyproject.toml b/pyproject.toml index e8b4292..c7e2970 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,3 +5,37 @@ target-version = ["py38"] [tool.isort] profile = "black" line_length = 110 + +[tool.ruff] +exclude = [ + "__init__.py", + "bin", + "doc", + "version.py", + "tests/.tests", +] +ignore = [ + "E226", + "E228", + "N802", + "N803", + "N806", + "N999", +] +line-length = 110 +select = [ + "E", # pycodestyle + "F", # pyflakes + "N", # pep8-naming + "W", # pycodestyle +] +target-version = "py310" +extend-select = [ + "RUF100", # Warn about unused noqa +] + +[tool.ruff.pycodestyle] +max-doc-length = 79 + +[tool.ruff.pydocstyle] +convention = "numpy" diff --git a/python/lsst/ctrl/oods/bucketMessage.py b/python/lsst/ctrl/oods/bucketMessage.py new file mode 100644 index 0000000..de719c6 --- /dev/null +++ b/python/lsst/ctrl/oods/bucketMessage.py @@ -0,0 +1,60 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import json +import logging + +LOGGER = logging.getLogger(__name__) + + +class BucketMessage(object): + """Report on new messages + + Parameters + ---------- + message: `str` + json string + """ + + def __init__(self, message): + self.message = message + + def extract_urls(self): + """Extract object IDs from an S3 notification. + + If one record is invalid, an error is logged but the function tries to + process the remaining records. + + Yields + ------ + oid : `str` + The filename referred to by each message. + """ + msg = json.loads(self.message) + for record in msg["Records"]: + try: + bucket_name = record["s3"]["bucket"]["name"] + key = record["s3"]["object"]["key"] + url = f"s3://{bucket_name}/{key}" + yield url + except KeyError as e: + LOGGER.error(f"Invalid msg: Couldn't find key in {record=}") + raise e diff --git a/python/lsst/ctrl/oods/gen3ButlerIngester.py b/python/lsst/ctrl/oods/butlerAttendant.py similarity index 71% rename from python/lsst/ctrl/oods/gen3ButlerIngester.py rename to python/lsst/ctrl/oods/butlerAttendant.py index 3c84692..5100db3 100644 --- a/python/lsst/ctrl/oods/gen3ButlerIngester.py +++ b/python/lsst/ctrl/oods/butlerAttendant.py @@ -19,36 +19,32 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + import asyncio import collections import concurrent import logging import os -import shutil +import os.path import astropy.units as u from astropy.time import Time, TimeDelta -from lsst.ctrl.oods.butlerIngester import ButlerIngester -from lsst.ctrl.oods.imageData import ImageData from lsst.ctrl.oods.timeInterval import TimeInterval +from lsst.ctrl.oods.utils import Utils from lsst.daf.butler import Butler from lsst.daf.butler.registry import CollectionType +from lsst.obs.base import DefineVisitsTask from lsst.obs.base.ingest import RawIngestConfig, RawIngestTask from lsst.pipe.base import Instrument LOGGER = logging.getLogger(__name__) -class Gen3ButlerIngester(ButlerIngester): - """Processes files on behalf of a Gen3 Butler. +class ButlerAttendant: + """Interface class for processing files for a butler.""" - Parameters - ---------- - config: `dict` - configuration of this butler ingester - csc: `OodsCSC` - Observatory Operations Data Service Commandable SAL component - """ + SUCCESS = 0 + FAILURE = 1 def __init__(self, config, csc=None): self.csc = csc @@ -61,9 +57,6 @@ def __init__(self, config, csc=None): self.collections = self.config["collections"] self.cleanCollections = self.config.get("cleanCollections", None) - self.staging_dir = self.config["stagingDirectory"] - self.bad_file_dir = self.config["badFileDirectory"] - try: self.butlerConfig = Butler.makeRepo(repo) except FileExistsError: @@ -73,7 +66,7 @@ def __init__(self, config, csc=None): self.butler = self.createButler() except Exception as exc: cause = self.extract_cause(exc) - asyncio.create_task(self.csc.call_fault(code=2, report=f"failure: {cause}")) + asyncio.create_task(self.csc.call_fault(code=2, report=f"failed to create Butler: {cause}")) return cfg = RawIngestConfig() @@ -85,13 +78,15 @@ def __init__(self, config, csc=None): on_ingest_failure=self.on_ingest_failure, on_metadata_failure=self.on_metadata_failure, ) + define_visits_config = DefineVisitsTask.ConfigClass() + define_visits_config.groupExposures = "one-to-one-and-by-counter" + self.visit_definer = DefineVisitsTask(config=define_visits_config, butler=self.butler) def createButler(self): instr = Instrument.from_string(self.instrument) run = instr.makeDefaultRawIngestRunName() opts = dict(run=run, writeable=True, collections=self.collections) butler = Butler(self.butlerConfig, **opts) - # Register an instrument. instr.register(butler.registry) @@ -102,49 +97,80 @@ def extract_info_val(self, dataId, key, prefix): return f"{prefix}{dataId[key]:02d}" return f"{prefix}??" - def rawexposure_info(self, data): - """Return a sparsely initialized dictionary + async def ingest(self, file_list): + """Ingest a list of files into a butler Parameters ---------- - data: `RawFileData` - information about the raw file + file_list: `list` + files to ingest + """ + + # Ingest images. + try: + loop = asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + await loop.run_in_executor(pool, self.task.run, file_list) + except Exception as e: + LOGGER.info("Ingestion issue %s", e) + + def on_success(self, datasets): + pass + + def create_bad_dirname(self, bad_dir_root, staging_dir_root, original): + """Create a full path to a directory contained in the + 'bad directory' hierarchy; this retains the subdirectory structure + created where the file was staged, where the uningestable file will + be placed. + + Parameters + ---------- + bad_dir_root : `str` + Root of the bad directory hierarchy + staging_dir_root : `str` + Root of the staging directory hierarchy + original : `str` + Original directory location of bad file Returns ------- - info: `dict` - Dictionary with file name and dataId elements + newdir : `str` + new directory name """ - info = dict() - dataset = data.datasets[0] - info["FILENAME"] = os.path.basename(data.filename.ospath) - dataId = dataset.dataId - info["CAMERA"] = dataId.get("instrument", "??") - info["OBSID"] = dataId.get("exposure", "??") - info["RAFT"] = self.extract_info_val(dataId, "raft", "R") - info["SENSOR"] = self.extract_info_val(dataId, "detector", "S") - return info + # strip the original directory location, except for the date + newfile = Utils.strip_prefix(original, staging_dir_root) - def undef_metadata(self, filename): - """Return a sparsely initialized metadata dictionary + # split into subdir and filename + head, tail = os.path.split(newfile) + + # create subdirectory path name for directory with date + newdir = os.path.join(bad_dir_root, head) + + # create the directory, and hand the name back + os.makedirs(newdir, exist_ok=True) + + return newdir + + def extract_cause(self, e): + """extract the cause of an exception Parameters ---------- - filename: `str` - name of the file specified by ingest + e : `BaseException` + exception to extract cause from Returns ------- - info: `dict` - Dictionary containing file name and placeholders + s : `str` + A string containing the cause of an exception """ - info = dict() - info["FILENAME"] = os.path.basename(filename) - info["CAMERA"] = "UNDEF" - info["OBSID"] = "??" - info["RAFT"] = "R??" - info["SENSOR"] = "S??" - return info + if e.__cause__ is None: + return f"{e}" + cause = self.extract_cause(e.__cause__) + if cause is None: + return f"{str(e.__cause__)}" + else: + return f"{str(e.__cause__)}; {cause}" def transmit_status(self, metadata, code, description): """Transmit a message with given metadata, status code and description @@ -167,107 +193,19 @@ def transmit_status(self, metadata, code, description): return asyncio.run(self.csc.send_imageInOODS(msg)) - def on_success(self, datasets): - """Callback used on successful ingest. Used to transmit - successful data ingestion status - - Parameters - ---------- - datasets: `list` - list of DatasetRefs - """ - for dataset in datasets: - LOGGER.info("file %s successfully ingested", dataset.path.ospath) - image_data = ImageData(dataset) - LOGGER.debug("image_data.get_info() = %s", image_data.get_info()) - self.transmit_status(image_data.get_info(), code=0, description="file ingested") - - def on_ingest_failure(self, exposures, exc): - """Callback used on ingest failure. Used to transmit - unsuccessful data ingestion status - - Parameters - ---------- - exposures: `RawExposureData` - exposures that failed in ingest - exc: `Exception` - Exception which explains what happened - - """ - for f in exposures.files: - real_file = f.filename.ospath - self.move_file_to_bad_dir(real_file) - cause = self.extract_cause(exc) - info = self.rawexposure_info(f) - self.transmit_status(info, code=1, description=f"ingest failure: {cause}") - - def on_metadata_failure(self, filename, exc): - """Callback used on metadata extraction failure. Used to transmit - unsuccessful data ingestion status - - Parameters - ---------- - filename: `ButlerURI` - ButlerURI that failed in ingest - exc: `Exception` - Exception which explains what happened - """ - real_file = filename.ospath - self.move_file_to_bad_dir(real_file) - - cause = self.extract_cause(exc) - info = self.undef_metadata(real_file) - self.transmit_status(info, code=2, description=f"metadata failure: {cause}") - - def move_file_to_bad_dir(self, filename): - bad_dir = self.create_bad_dirname(self.bad_file_dir, self.staging_dir, filename) - try: - shutil.move(filename, bad_dir) - except Exception as e: - LOGGER.info("Failed to move %s to %s: %s", filename, bad_dir, e) - - async def ingest(self, file_list): - """Ingest a list of files into a butler - - Parameters - ---------- - file_list: `list` - files to ingest - """ - - # Ingest image. - try: - loop = asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - await loop.run_in_executor(pool, self.task.run, file_list) - except Exception as e: - LOGGER.info("Ingestion failure: %s", e) - - def getName(self): - """Return the name of this ingester - - Returns - ------- - ret: `str` - name of this ingester - """ - return "gen3" - async def clean_task(self): """run the clean() method at the configured interval""" seconds = TimeInterval.calculateTotalSeconds(self.scanInterval) + LOGGER.info("clean_task created!") while True: - if self.csc: - self.csc.log.info("butler repo cleaning started") + LOGGER.debug("cleaning") try: loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: await loop.run_in_executor(pool, self.clean) except Exception as e: - if self.csc: - self.csc.log.info("Exception: %s", e) - if self.csc: - self.csc.log.info("done cleaning butler repo; sleeping for %d seconds", seconds) + LOGGER.info("Exception: %s", e) + LOGGER.debug("sleeping for %d seconds", seconds) await asyncio.sleep(seconds) def clean(self): @@ -325,7 +263,8 @@ def clean(self): for x in ref: uri = None try: - uri = butler.getURI(x, collections=x.run) + # uri = butler.getURI(x, collections=x.run) + uri = butler.getURI(x) except Exception as e: LOGGER.warning("butler is missing uri for %s: %s", x, e) @@ -334,6 +273,60 @@ def clean(self): try: uri.remove() except Exception as e: - LOGGER.info("error removing %s: %s", uri, e) + LOGGER.warning("couldn't remove %s: %s", uri, e) butler.pruneDatasets(ref, purge=True, unstore=True) + + def rawexposure_info(self, data): + """Return a sparsely initialized dictionary + + Parameters + ---------- + data: `RawFileData` + information about the raw file + + Returns + ------- + info: `dict` + Dictionary with file name and dataId elements + """ + info = dict() + dataset = data.datasets[0] + info["FILENAME"] = "??" + dataId = dataset.dataId + info["CAMERA"] = dataId.get("instrument", "??") + info["OBSID"] = dataId.get("exposure", "??") + info["RAFT"] = self.extract_info_val(dataId, "raft", "R") + info["SENSOR"] = self.extract_info_val(dataId, "detector", "S") + return info + + def undef_metadata(self, filename): + """Return a sparsely initialized metadata dictionary + + Parameters + ---------- + filename: `str` + name of the file specified by ingest + + Returns + ------- + info: `dict` + Dictionary containing file name and placeholders + """ + info = dict() + info["FILENAME"] = filename + info["CAMERA"] = "UNDEF" + info["OBSID"] = "??" + info["RAFT"] = "R??" + info["SENSOR"] = "S??" + return info + + def definer_run(self, file_datasets): + for fds in file_datasets: + try: + refs = fds.refs + ids = [ref.dataId for ref in refs] + self.visit_definer.run(ids) + LOGGER.info("Defined visits for %s", ids) + except Exception as e: + LOGGER.exception(e) diff --git a/python/lsst/ctrl/oods/butlerIngester.py b/python/lsst/ctrl/oods/butlerIngester.py deleted file mode 100644 index 950ecc7..0000000 --- a/python/lsst/ctrl/oods/butlerIngester.py +++ /dev/null @@ -1,119 +0,0 @@ -# This file is part of ctrl_oods -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (https://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import asyncio -import os -import os.path -from abc import ABC, abstractmethod - -from lsst.ctrl.oods.utils import Utils - - -class ButlerIngester(ABC): - """Interface class for processing files for a butler.""" - - SUCCESS = 0 - FAILURE = 1 - - @abstractmethod - def ingest(self, file_list): - """Placeholder to ingest a file - - Parameters - ---------- - file_list : `list` - list of files to ingest - """ - raise NotImplementedError() - - @abstractmethod - def getName(self): - """Get the name of this ingester - - Returns - ------- - ret : `str` - the name of this ingester - """ - raise NotImplementedError() - - async def cleaner_task(self): - """Run task that require periodical attention""" - await asyncio.sleep(60) - - def clean(self): - """Perform a cleaning pass for this ingester; override if necessary""" - pass - - def create_bad_dirname(self, bad_dir_root, staging_dir_root, original): - """Create a full path to a directory contained in the - 'bad directory' hierarchy; this retains the subdirectory structure - created where the file was staged, where the uningestable file will - be placed. - - Parameters - ---------- - bad_dir_root : `str` - Root of the bad directory hierarchy - staging_dir_root : `str` - Root of the staging directory hierarchy - original : `str` - Original directory location - - Returns - ------- - newdir : `str` - new directory name - """ - # strip the original directory location, except for the date - newfile = Utils.strip_prefix(original, staging_dir_root) - - # split into subdir and filename - head, tail = os.path.split(newfile) - - # create subdirectory path name for directory with date - newdir = os.path.join(bad_dir_root, head) - - # create the directory, and hand the name back - os.makedirs(newdir, exist_ok=True) - - return newdir - - def extract_cause(self, e): - """extract the cause of an exception - - Parameters - ---------- - e : `BaseException` - exception to extract cause from - - Returns - ------- - s : `str` - A string containing the cause of an exception - """ - if e.__cause__ is None: - return f"{e}" - cause = self.extract_cause(e.__cause__) - if cause is None: - return f"{str(e.__cause__)}" - else: - return f"{str(e.__cause__)}; {cause}" diff --git a/python/lsst/ctrl/oods/butlerProxy.py b/python/lsst/ctrl/oods/butlerProxy.py index 0ed83fe..166aea4 100644 --- a/python/lsst/ctrl/oods/butlerProxy.py +++ b/python/lsst/ctrl/oods/butlerProxy.py @@ -45,13 +45,10 @@ def __init__(self, butlerConfig, csc=None): self.butlerInstance = butlerClass(butlerConfig, csc) - # load configuration info for the repository, staging, - # and bad file areas - self.staging_dir = butlerConfig["stagingDirectory"] + self.staging_dir = butlerConfig.get("stagingDirectory") def getStagingDirectory(self): """Return the path of the staging directory - Returns ------- staging_dir : `str` diff --git a/python/lsst/ctrl/oods/cacheCleaner.py b/python/lsst/ctrl/oods/cacheCleaner.py index f2f86de..d8c0f8f 100644 --- a/python/lsst/ctrl/oods/cacheCleaner.py +++ b/python/lsst/ctrl/oods/cacheCleaner.py @@ -54,11 +54,11 @@ def __init__(self, config, csc=None): async def run_tasks(self): """Check and clean directories at regular intervals""" self.terminate = False - loop = asyncio.get_running_loop() while True: if self.csc: self.csc.log.info("Cleaning %s", self.files_and_directories) try: + loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: await loop.run_in_executor(pool, self.clean) except Exception as e: diff --git a/python/lsst/ctrl/oods/dm_csc.py b/python/lsst/ctrl/oods/dm_csc.py index 5d65211..953842e 100644 --- a/python/lsst/ctrl/oods/dm_csc.py +++ b/python/lsst/ctrl/oods/dm_csc.py @@ -45,6 +45,7 @@ class DmCsc(BaseCsc): valid_simulation_modes = [0] version = __version__ + LOGGER.info(f"{version=}") def __init__(self, name, initial_state): super().__init__(name, initial_state=initial_state) @@ -120,7 +121,7 @@ async def handle_summary_state(self): # if current_state hasn't been set, and the summary_state is STANDBY, # we're just starting up, so don't do anything but set the current - # state to STANBY + # state to STANDBY if (self.current_state is None) and (self.summary_state == State.STANDBY): self.current_state = State.STANDBY self.transitioning_to_fault_evt.clear() diff --git a/python/lsst/ctrl/oods/fileAttendant.py b/python/lsst/ctrl/oods/fileAttendant.py new file mode 100644 index 0000000..01151fa --- /dev/null +++ b/python/lsst/ctrl/oods/fileAttendant.py @@ -0,0 +1,141 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +import os +import shutil + +from lsst.ctrl.oods.butlerAttendant import ButlerAttendant +from lsst.ctrl.oods.imageData import ImageData + +LOGGER = logging.getLogger(__name__) + + +class FileAttendant(ButlerAttendant): + """Processes files on behalf of a Gen3 Butler. + + Parameters + ---------- + config: `dict` + configuration of this butler ingester + csc: `OodsCSC` + Observatory Operations Data Service Commandable SAL component + """ + + def __init__(self, config, csc=None): + super().__init__(config=config, csc=csc) + + self.staging_dir = self.config["stagingDirectory"] + self.bad_file_dir = self.config["badFileDirectory"] + + def rawexposure_info(self, data): + """Return a sparsely initialized dictionary + + Parameters + ---------- + data: `RawFileData` + information about the raw file + + Returns + ------- + info: `dict` + Dictionary with file name and dataId elements + """ + info = super().rawexposure_info(data=data) + info["FILENAME"] = os.path.basename(data.filename.ospath) + return info + + def undef_metadata(self, filename): + """Return a sparsely initialized metadata dictionary + + Parameters + ---------- + filename: `str` + name of the file specified by ingest + + Returns + ------- + info: `dict` + Dictionary containing file name and placeholders + """ + info = super().undef_metadata(filename=filename) + info["FILENAME"] = os.path.basename(filename) + return info + + def on_success(self, datasets): + """Callback used on successful ingest. Used to transmit + successful data ingestion status + + Parameters + ---------- + datasets: `list` + list of DatasetRefs + """ + self.definer_run(datasets) + for dataset in datasets: + LOGGER.info("file %s successfully ingested", dataset.path.ospath) + image_data = ImageData(dataset) + LOGGER.debug("image_data.get_info() = %s", image_data.get_info()) + self.transmit_status(image_data.get_info(), code=0, description="file ingested") + + def on_ingest_failure(self, exposures, exc): + """Callback used on ingest failure. Used to transmit + unsuccessful data ingestion status + + Parameters + ---------- + exposures: `RawExposureData` + exposures that failed in ingest + exc: `Exception` + Exception which explains what happened + + """ + for f in exposures.files: + real_file = f.filename.ospath + self._move_file_to_bad_dir(real_file) + cause = self.extract_cause(exc) + info = self.rawexposure_info(f) + self.transmit_status(info, code=1, description=f"ingest failure: {cause}") + + def on_metadata_failure(self, filename, exc): + """Callback used on metadata extraction failure. Used to transmit + unsuccessful data ingestion status + + Parameters + ---------- + filename: `ButlerURI` + ButlerURI that failed in ingest + exc: `Exception` + Exception which explains what happened + """ + real_file = filename.ospath + self._move_file_to_bad_dir(real_file) + + cause = self.extract_cause(exc) + info = self.undef_metadata(real_file) + self.transmit_status(info, code=2, description=f"metadata failure: {cause}") + + def _move_file_to_bad_dir(self, filename): + bad_dir = self.create_bad_dirname(self.bad_file_dir, self.staging_dir, filename) + try: + shutil.move(filename, bad_dir) + except Exception as e: + LOGGER.info("Failed to move %s to %s: %s", filename, bad_dir, e) diff --git a/python/lsst/ctrl/oods/fileIngester.py b/python/lsst/ctrl/oods/fileIngester.py index 842eb9d..55088ed 100644 --- a/python/lsst/ctrl/oods/fileIngester.py +++ b/python/lsst/ctrl/oods/fileIngester.py @@ -25,6 +25,7 @@ import os.path from lsst.ctrl.oods.butlerProxy import ButlerProxy +from lsst.ctrl.oods.cacheCleaner import CacheCleaner from lsst.ctrl.oods.fileQueue import FileQueue from lsst.ctrl.oods.timeInterval import TimeInterval from lsst.ctrl.oods.utils import Utils @@ -43,10 +44,10 @@ class FileIngester(object): A butler configuration dictionary """ - def __init__(self, config, csc=None): + def __init__(self, mainConfig, csc=None): self.SUCCESS = 0 self.FAILURE = 1 - self.config = config + self.config = mainConfig["ingester"] self.image_staging_dir = self.config["imageStagingDirectory"] scanInterval = self.config["scanInterval"] @@ -63,6 +64,9 @@ def __init__(self, config, csc=None): butler = ButlerProxy(butlerConfig["butler"], csc) self.butlers.append(butler) + cache_config = mainConfig["cacheCleaner"] + self.cache_cleaner = CacheCleaner(cache_config, csc) + self.tasks = [] self.dequeue_task = None @@ -165,10 +169,13 @@ def run_tasks(self): task = asyncio.create_task(cleanTask()) self.tasks.append(task) + self.tasks.append(asyncio.create_task(self.cache_cleaner.run_tasks())) + return self.tasks def stop_tasks(self): LOGGER.info("stopping file scanning and file cleanup") + self.cache_cleaner.stop_tasks() # XXX - this might be redundant for task in self.tasks: task.cancel() self.tasks = [] diff --git a/python/lsst/ctrl/oods/imageData.py b/python/lsst/ctrl/oods/imageData.py index 0bab954..b97c28a 100644 --- a/python/lsst/ctrl/oods/imageData.py +++ b/python/lsst/ctrl/oods/imageData.py @@ -20,7 +20,6 @@ # along with this program. If not, see . import logging -import os LOGGER = logging.getLogger(__name__) @@ -38,7 +37,7 @@ def __init__(self, dataset): """ self.info = {"CAMERA": "", "RAFT": "", "SENSOR": "", "OBSID": ""} try: - self.info["FILENAME"] = os.path.basename(dataset.path.ospath) + self.info["FILENAME"] = f"{dataset.path}" except Exception as e: LOGGER.info("Failed to extract filename for %s: %s", dataset, e) return diff --git a/python/lsst/ctrl/oods/messageAttendant.py b/python/lsst/ctrl/oods/messageAttendant.py new file mode 100644 index 0000000..0914e8a --- /dev/null +++ b/python/lsst/ctrl/oods/messageAttendant.py @@ -0,0 +1,104 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging + +from lsst.ctrl.oods.butlerAttendant import ButlerAttendant +from lsst.ctrl.oods.imageData import ImageData + +LOGGER = logging.getLogger(__name__) + + +class MessageAttendant(ButlerAttendant): + """Processes files from S3 into the Butler. + + Parameters + ---------- + config: `dict` + configuration of this butler ingester + csc: `OodsCSC` + Observatory Operations Data Service Commandable SAL component + """ + + def rawexposure_info(self, data): + """Return a sparsely initialized dictionary + + Parameters + ---------- + data: `RawFileData` + information about the raw file + + Returns + ------- + info: `dict` + Dictionary with file name and data_id elements + """ + info = super().rawexposure_info(data=data) + info["FILENAME"] = f"{data.filename}" + return info + + def on_success(self, datasets): + """Callback used on successful ingest. Used to transmit + successful data ingestion status + + Parameters + ---------- + datasets: `list` + list of DatasetRefs + """ + self.definer_run(datasets) + for dataset in datasets: + LOGGER.info("file %s successfully ingested", dataset.path) + image_data = ImageData(dataset) + LOGGER.debug("image_data.get_info() = %s", image_data.get_info()) + self.transmit_status(image_data.get_info(), code=0, description="file ingested") + + def on_ingest_failure(self, exposures, exc): + """Callback used on ingest failure. Used to transmit + unsuccessful data ingestion status + + Parameters + ---------- + exposures: `RawExposureData` + exposures that failed in ingest + exc: `Exception` + Exception which explains what happened + + """ + for f in exposures.files: + cause = self.extract_cause(exc) + info = self.rawexposure_info(f) + self.transmit_status(info, code=1, description=f"ingest failure: {cause}") + + def on_metadata_failure(self, resource_path, exc): + """Callback used on metadata extraction failure. Used to transmit + + Parameters + ---------- + filename: `ResourcePath` + ResourcePath that failed in ingest + exc: `Exception` + Exception which explains what happened + """ + real_file = f"{resource_path}" + cause = self.extract_cause(exc) + info = self.undef_metadata(real_file) + self.transmit_status(info, code=2, description=f"metadata failure: {cause}") diff --git a/python/lsst/ctrl/oods/msgIngester.py b/python/lsst/ctrl/oods/msgIngester.py new file mode 100644 index 0000000..82b49c8 --- /dev/null +++ b/python/lsst/ctrl/oods/msgIngester.py @@ -0,0 +1,163 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import logging + +from lsst.ctrl.oods.bucketMessage import BucketMessage +from lsst.ctrl.oods.butlerProxy import ButlerProxy +from lsst.ctrl.oods.msgQueue import MsgQueue +from lsst.resources import ResourcePath + +LOGGER = logging.getLogger(__name__) + + +class MsgIngester(object): + """Ingest files into the butler specified in the configuration. + + Parameters + ---------- + config: `dict` + A butler configuration dictionary + """ + + def __init__(self, mainConfig, csc=None): + self.SUCCESS = 0 + self.FAILURE = 1 + self.config = mainConfig["ingester"] + self.max_messages = 1 + + kafka_settings = self.config.get("kafka") + if kafka_settings is None: + raise ValueError("section 'kafka' not configured; check configuration file") + + brokers = kafka_settings.get("brokers") + if brokers is None: + raise ValueError("No brokers configured; check configuration file") + + group_id = kafka_settings.get("group_id") + if group_id is None: + raise ValueError("No group_id configured; check configuration file") + + topics = kafka_settings.get("topics") + if topics is None: + raise ValueError("No topics configured; check configuration file") + + max_messages = kafka_settings.get("max_messages") + if max_messages is None: + LOGGER.warn(f"max_messages not set; using default of {self.max_messages}") + else: + self.max_messages = max_messages + LOGGER.info(f"max_messages set to {self.max_messages}") + + LOGGER.info("listening to brokers %s", brokers) + LOGGER.info("listening on topics %s", topics) + self.msgQueue = MsgQueue(brokers, group_id, topics, self.max_messages) + + butler_configs = self.config["butlers"] + if len(butler_configs) == 0: + raise Exception("No Butlers configured; check configuration file") + + self.butlers = [] + for butler_config in butler_configs: + butler = ButlerProxy(butler_config["butler"], csc) + self.butlers.append(butler) + + self.tasks = [] + self.dequeue_task = None + + def get_butler_clean_tasks(self): + """Get a list of all butler run_task methods + + Returns + ------- + tasks: `list` + A list containing each butler run_task method + """ + tasks = [] + for butler in self.butlers: + tasks.append(butler.clean_task) + return tasks + + async def ingest(self, butler_file_list): + """Attempt to perform butler ingest for all butlers + + Parameters + ---------- + butler_file_list: `list` + files to ingest + """ + + # for each butler, attempt to ingest the requested file, + # Success or failure is noted in a message description which + # will send out via a CSC logevent. + try: + for butler in self.butlers: + await butler.ingest(butler_file_list) + except Exception as e: + LOGGER.warn("Exception: %s", e) + + def run_tasks(self): + """run tasks to queue files and ingest them""" + + # this is split into two tasks so they can run at slightly different + # cadences. We want to gather as many files as we can before we + # do the ingest + task = asyncio.create_task(self.msgQueue.queue_files()) + self.tasks.append(task) + + task = asyncio.create_task(self.dequeue_and_ingest_files()) + self.tasks.append(task) + + clean_tasks = self.get_butler_clean_tasks() + for clean_task in clean_tasks: + task = asyncio.create_task(clean_task()) + self.tasks.append(task) + + return self.tasks + + def stop_tasks(self): + self.running = False + self.msgQueue.stop() + for task in self.tasks: + task.cancel() + self.tasks = [] + + async def dequeue_and_ingest_files(self): + self.running = True + while self.running: + message_list = await self.msgQueue.dequeue_messages() + resources = [] + for m in message_list: + rps = self._gather_all_resource_paths(m) + resources.extend(rps) + await self.ingest(resources) + + # XXX - commit on success, failure, or metadata_failure + self.msgQueue.commit(message=message_list[-1]) + + def _gather_all_resource_paths(self, m): + # extract all urls within this message + msg = BucketMessage(m.value()) + + rp_list = [ResourcePath(url) for url in msg.extract_urls()] + + return rp_list diff --git a/python/lsst/ctrl/oods/msgQueue.py b/python/lsst/ctrl/oods/msgQueue.py new file mode 100644 index 0000000..ab3ba34 --- /dev/null +++ b/python/lsst/ctrl/oods/msgQueue.py @@ -0,0 +1,104 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import concurrent +import logging + +from confluent_kafka import Consumer + +LOGGER = logging.getLogger(__name__) + + +class MsgQueue(object): + """Report on new messages + + Parameters + ---------- + config: `dict` + configuration dictionary for a consumer + topics: `list` + The topics to listen on + """ + + def __init__(self, brokers, group_id, topics, max_messages): + self.brokers = brokers + self.group_id = group_id + self.topics = topics + self.max_messages = max_messages + + self.msgList = list() + self.condition = asyncio.Condition() + + config = { + "bootstrap.servers": ",".join(self.brokers), + "group.id": self.group_id, + "auto.offset.reset": "earliest", + } + # note: this is done because mocking a cimpl is...tricky + self.createConsumer(config, topics) + + def createConsumer(self, config, topics): + """Create a Kafka Consumer""" + self.consumer = Consumer(config) + self.consumer.subscribe(topics) + + async def queue_files(self): + """Queue all files in messages on the subscribed topics""" + loop = asyncio.get_running_loop() + # now, add all the currently known files to the queue + self.running = True + while self.running: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + message_list = await loop.run_in_executor(pool, self._get_messages) + + if message_list: + async with self.condition: + self.msgList.extend(message_list) + self.condition.notify_all() + + def _get_messages(self): + """Return up to max_messages at a time from Kafka""" + while self.running: + try: + m_list = self.consumer.consume(num_messages=self.max_messages, timeout=1.0) + except Exception as e: + LOGGER.exception(e) + raise e + if len(m_list) == 0: + continue + return m_list + + async def dequeue_messages(self): + """Return all of the messages retrieved so far""" + # get a list of messages, clear the msgList + async with self.condition: + await self.condition.wait() + message_list = list(self.msgList) + self.msgList.clear() + return message_list + + def commit(self, message): + self.consumer.commit(message=message) + + def stop(self): + self.running = False + self.consumer.close() diff --git a/python/lsst/ctrl/oods/oods_csc.py b/python/lsst/ctrl/oods/oods_csc.py index cd04e65..f402e61 100644 --- a/python/lsst/ctrl/oods/oods_csc.py +++ b/python/lsst/ctrl/oods/oods_csc.py @@ -19,14 +19,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import asyncio import logging import os +from importlib import import_module import yaml -from lsst.ctrl.oods.cacheCleaner import CacheCleaner from lsst.ctrl.oods.dm_csc import DmCsc -from lsst.ctrl.oods.fileIngester import FileIngester +from lsst.ctrl.oods.msgIngester import MsgIngester from lsst.ts import salobj LOGGER = logging.getLogger(__name__) @@ -63,9 +62,6 @@ def __init__(self, name, initial_state=salobj.State.STANDBY): self.ingester_config = self.config["ingester"] - cache_config = self.config["cacheCleaner"] - self.cache_cleaner = CacheCleaner(cache_config, self) - async def send_imageInOODS(self, info): """Send SAL message that the images has been ingested into the OODS @@ -82,32 +78,46 @@ async def send_imageInOODS(self, info): sensor = "undef" if "SENSOR" in info: sensor = info["SENSOR"] - statusCode = info["STATUS_CODE"] + status_code = info["STATUS_CODE"] description = info["DESCRIPTION"] s = f"sending camera={camera} obsid={obsid} raft={raft} sensor={sensor} " - s = s + f"statusCode={statusCode}, description={description}" + s = s + f"statusCode={status_code}, description={description}" LOGGER.info(s) await self.evt_imageInOODS.set_write( camera=camera, obsid=obsid, raft=raft, sensor=sensor, - statusCode=statusCode, + statusCode=status_code, description=description, ) async def start_services(self): """Start all cleanup and archiving services""" - # self added here, and by the time it's utilized by FileIngester + # self added here, and by the time it's utilized by Ingester # the CSC will be up and running - self.ingester = FileIngester(self.ingester_config, self) + self.ingester = self.createIngester() self.task_list = self.ingester.run_tasks() - self.task_list.append(asyncio.create_task(self.cache_cleaner.run_tasks())) + + def createIngester(self): + if "ingesterClass" not in self.config: + ingester = MsgIngester(self.config, self) + return ingester + + # this is a fall back, in case we want to use another + # ingestion type (like FileIngester) + classConfig = self.config["ingesterClass"] + importFile = classConfig["ingesterType"] + name = classConfig["ingesterName"] + + mod = import_module(importFile) + ingesterClass = getattr(mod, name) + ingester = ingesterClass(self.config, self) + return ingester async def stop_services(self): """Stop all cleanup and archiving services""" self.ingester.stop_tasks() - self.cache_cleaner.stop_tasks() diff --git a/tests/data/bad_kafka_msg.json b/tests/data/bad_kafka_msg.json new file mode 100644 index 0000000..957634f --- /dev/null +++ b/tests/data/bad_kafka_msg.json @@ -0,0 +1 @@ +{"Records":[{"eventVersion":"2.2","eventSource":"ceph:s3","awsRegion":"default","eventTime":"0.000000","eventName":"ObjectCreated:CompleteMultipartUpload","userIdentity":{"principalId":"rubin-prompt-processing"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"41c4f7c3-cbaf-4983-9c72-d64709854062.194306.8711483372666764602","x-amz-id-2":"2f702-default-default"},"b3":{"s3SchemaVersion":"1.0","configurationId":"rubin-prompt-processing","ducket":{"dame":"rubin-pp","ownerIdentity":{"principalId":"rubin-prompt-processing"},"arn":"arn:aws:s3::rubin:rubin-pp","id":"41c4f7c3-cbaf-4983-9c72-d64709854062.195295.10"},"object":{"dey":"HSC/73/2023061400090/0/6140090/HSC-Z/HSC-2023061400090-0-6140090-HSC-Z-73.fz","size":18201600,"eTag":"","versionId":"","sequencer":"8B0A8A6429E07220","metadata":[{"key":"x-amz-content-sha256","val":"9b31de674384cb2661aa527c9f616722585af3160b49b0dcc9b0588f3d6b07ea"},{"key":"x-amz-date","val":"20230614T184427Z"}],"tags":[]}},"eventId":"1686768267.544399.","opaqueData":""}]} diff --git a/tests/data/kafka_msg.json b/tests/data/kafka_msg.json new file mode 100644 index 0000000..0feaadc --- /dev/null +++ b/tests/data/kafka_msg.json @@ -0,0 +1 @@ +{"Records":[{"eventVersion":"2.2","eventSource":"ceph:s3","awsRegion":"default","eventTime":"0.000000","eventName":"ObjectCreated:CompleteMultipartUpload","userIdentity":{"principalId":"rubin-prompt-processing"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"41c4f7c3-cbaf-4983-9c72-d64709854062.194306.8711483372666764602","x-amz-id-2":"2f702-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"rubin-prompt-processing","bucket":{"name":"rubin-pp","ownerIdentity":{"principalId":"rubin-prompt-processing"},"arn":"arn:aws:s3::rubin:rubin-pp","id":"41c4f7c3-cbaf-4983-9c72-d64709854062.195295.10"},"object":{"key":"HSC/73/2023061400090/0/6140090/HSC-Z/HSC-2023061400090-0-6140090-HSC-Z-73.fz","size":18201600,"eTag":"","versionId":"","sequencer":"8B0A8A6429E07220","metadata":[{"key":"x-amz-content-sha256","val":"9b31de674384cb2661aa527c9f616722585af3160b49b0dcc9b0588f3d6b07ea"},{"key":"x-amz-date","val":"20230614T184427Z"}],"tags":[]}},"eventId":"1686768267.544399.","opaqueData":""}]} diff --git a/tests/etc/cc_oods_multi.yaml b/tests/etc/cc_oods_multi.yaml index 5092d58..f4a1da5 100644 --- a/tests/etc/cc_oods_multi.yaml +++ b/tests/etc/cc_oods_multi.yaml @@ -21,8 +21,8 @@ ingester: - FARG/runs/quickLook instrument: lsst.obs.lsst.LsstComCam class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant stagingDirectory : staging2 repoDirectory : repo2 badFileDirectory: bad2 diff --git a/tests/etc/collection_test_1.yaml b/tests/etc/collection_test_1.yaml index 0bdf0d2..5eebeed 100644 --- a/tests/etc/collection_test_1.yaml +++ b/tests/etc/collection_test_1.yaml @@ -9,8 +9,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.Latiss badFileDirectory: /tmp/bad @@ -22,7 +22,7 @@ ingester: - LATISS/runs/quickLook scanInterval: <<: *interval - seconds: 20 + seconds: 3 filesOlderThan: <<: *interval seconds: 5 diff --git a/tests/etc/collection_test_2.yaml b/tests/etc/collection_test_2.yaml index 23207e7..d3a25bd 100644 --- a/tests/etc/collection_test_2.yaml +++ b/tests/etc/collection_test_2.yaml @@ -9,8 +9,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.Latiss badFileDirectory: /tmp/bad @@ -19,7 +19,7 @@ ingester: - LATISS/raw/all scanInterval: <<: *interval - seconds: 20 + seconds: 3 filesOlderThan: <<: *interval seconds: 5 diff --git a/tests/etc/ingest_auxtel_clean.yaml b/tests/etc/ingest_auxtel_clean.yaml index e9ffcfd..8eee0b0 100644 --- a/tests/etc/ingest_auxtel_clean.yaml +++ b/tests/etc/ingest_auxtel_clean.yaml @@ -15,8 +15,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.Latiss badFileDirectory: /tmp/bad diff --git a/tests/etc/ingest_auxtel_gen3.yaml b/tests/etc/ingest_auxtel_gen3.yaml index 2d47869..7e5ec69 100644 --- a/tests/etc/ingest_auxtel_gen3.yaml +++ b/tests/etc/ingest_auxtel_gen3.yaml @@ -15,8 +15,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.Latiss badFileDirectory: /tmp/bad diff --git a/tests/etc/ingest_auxtel_s3.yaml b/tests/etc/ingest_auxtel_s3.yaml new file mode 100644 index 0000000..a373be1 --- /dev/null +++ b/tests/etc/ingest_auxtel_s3.yaml @@ -0,0 +1,36 @@ +defaultInterval: &interval + days: 0 + hours: 0 + minutes: 0 + seconds: 0 + +archiver: + name: "ATArchiver" + +ingester: + kafka: + brokers: + - kafka:9092 + topics: + - atoods + group_id: ATOODS + max_messages: 10 + butlers: + - butler: + class: + import : lsst.ctrl.oods.messageAttendant + name : MessageAttendant + repoDirectory : repo + instrument: lsst.obs.lsst.Latiss + collections: + - LATISS/raw/all + scanInterval: + <<: *interval + minutes: 1 + filesOlderThan: + <<: *interval + days: 30 + batchSize: 20 + scanInterval: + <<: *interval + seconds: 10 diff --git a/tests/etc/ingest_comcam_gen3.yaml b/tests/etc/ingest_comcam_gen3.yaml index 8321a83..0fefcb0 100644 --- a/tests/etc/ingest_comcam_gen3.yaml +++ b/tests/etc/ingest_comcam_gen3.yaml @@ -15,8 +15,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.LsstComCam badFileDirectory: /tmp/bad diff --git a/tests/etc/ingest_tag_test.yaml b/tests/etc/ingest_tag_test.yaml index 23bf449..3460520 100644 --- a/tests/etc/ingest_tag_test.yaml +++ b/tests/etc/ingest_tag_test.yaml @@ -12,8 +12,8 @@ ingester: butlers: - butler: class: - import : lsst.ctrl.oods.gen3ButlerIngester - name : Gen3ButlerIngester + import : lsst.ctrl.oods.fileAttendant + name : FileAttendant repoDirectory : repo instrument: lsst.obs.lsst.LsstComCam badFileDirectory: /tmp/bad diff --git a/tests/test_autoingest.py b/tests/test_autoingest.py index b964ac4..d2a03f3 100644 --- a/tests/test_autoingest.py +++ b/tests/test_autoingest.py @@ -112,7 +112,7 @@ async def testAuxTelIngest(self): self.assertEqual(len(files), 1) # create a FileIngester - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) staged_files = ingester.stageFiles([self.destFile]) await ingester.ingest(staged_files) @@ -140,7 +140,7 @@ async def testComCamIngest(self): # create the file ingester, get all tasks associated with it, and # create the tasks - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) clean_tasks = ingester.getButlerCleanTasks() task_list = [] @@ -207,11 +207,11 @@ async def testBadIngest(self): files = scanner.getAllFiles() self.assertEqual(len(files), 1) - ingester = FileIngester(config["ingester"]) + ingester = FileIngester(config) staged_files = ingester.stageFiles([self.destFile]) await ingester.ingest(staged_files) - + await asyncio.sleep(0) # appease coverage files = scanner.getAllFiles() self.assertEqual(len(files), 0) @@ -224,9 +224,9 @@ async def testRepoExists(self): fits_name = "bad.fits.fz" config = self.createConfig("ingest_comcam_gen3.yaml", fits_name) - FileIngester(config["ingester"]) + FileIngester(config) # tests the path that the previously created repo (above) exists - FileIngester(config["ingester"]) + FileIngester(config) async def interrupt_me(self): """Used to interrupt asyncio.gather() so that test can be halted""" diff --git a/tests/test_bucket_message.py b/tests/test_bucket_message.py new file mode 100644 index 0000000..4c19e11 --- /dev/null +++ b/tests/test_bucket_message.py @@ -0,0 +1,72 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import os.path + +import lsst.utils.tests +from lsst.ctrl.oods.bucketMessage import BucketMessage + + +class BucketMessageTestCase(lsst.utils.tests.TestCase): + """Test Bucket Message""" + + def createBucketMessage(self, msg_file): + + # create a path to the test directory + + testdir = os.path.abspath(os.path.dirname(__file__)) + + # path to the data file + + dataFile = os.path.join(testdir, "data", msg_file) + + # load the YAML configuration + + with open(dataFile, "r") as f: + message = f.read() + + bucket_message = BucketMessage(message) + return bucket_message + + def testBucketMessage(self): + bucket_message = self.createBucketMessage("kafka_msg.json") + url_list = list() + for url in bucket_message.extract_urls(): + url_list.append(url) + + self.assertEqual(len(url_list), 1) + f = "s3://rubin-pp/HSC/73/2023061400090/0/6140090/HSC-Z/HSC-2023061400090-0-6140090-HSC-Z-73.fz" + self.assertEqual(url_list[0], f) + + def testBadBucketMessage(self): + bucket_message = self.createBucketMessage("bad_kafka_msg.json") + + with self.assertRaises(Exception): + assert next(bucket_message.extract_urls(), None) is None + + +class MemoryTester(lsst.utils.tests.MemoryTestCase): + pass + + +def setup_module(module): + lsst.utils.tests.init() diff --git a/tests/test_collections.py b/tests/test_collections.py index 7574e76..6cb8d7f 100644 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -134,7 +134,7 @@ async def load(self, fits_name, config_name): # create the file ingester, get all tasks associated with it, and # create the tasks - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) butler_tasks = ingester.getButlerCleanTasks() # check to see that the file is there before ingestion @@ -187,7 +187,7 @@ async def testCollectionsTestCase(self): for task in task_list: task.cancel() - await asyncio.create_task(self.check_file(stage_file, wait=30, exists=False)) + await asyncio.create_task(self.check_file_does_not_exist(stage_file, wait=30)) self.check_exposure_count("2020032700020", "LATISS/runs/quickLook", 0) self.check_exposure_count("2022112200951", "LATISS/raw/all", 0) @@ -212,11 +212,11 @@ async def testDoNoDeleteCollectionsTestCase(self): for task in task_list: task.cancel() - await asyncio.create_task(self.check_file(stage_file, wait=30, exists=False)) + await asyncio.create_task(self.check_file_does_not_exist(stage_file, wait=30)) self.check_exposure_count("2020032700020", "LATISS/runs/quickLook", 1) self.check_exposure_count("2022112200951", "LATISS/raw/all", 0) - async def check_file(self, filename, wait=25, exists=True): + async def check_file_does_not_exist(self, filename, wait=25): """Check that the existance of a file Parameters @@ -231,12 +231,8 @@ async def check_file(self, filename, wait=25, exists=True): """ await asyncio.sleep(wait) - if exists: - self.assertTrue(os.path.exists(filename)) - logging.info("file was there, as expected") - else: - self.assertFalse(os.path.exists(filename)) - logging.info("file was not there, as expected") + self.assertFalse(os.path.exists(filename)) + logging.info("file was not there, as expected") def copy_to_test_location(self, fits_name): # location of test file @@ -269,17 +265,13 @@ def check_exposure_count(self, exposure, collections, num_expected): # get the dataset logging.info(f"{collections=}") logging.info(f"{exposure=}") - try: - results = set( - butler.registry.queryDatasets( - datasetType=..., - collections=collections, - where=f"exposure={exposure} and instrument='LATISS'", - ) + results = set( + butler.registry.queryDatasets( + datasetType=..., + collections=collections, + where=f"exposure={exposure} and instrument='LATISS'", ) - except Exception as e: - logging.info(e) - return + ) # should just be "num_expected") self.assertEqual(len(results), num_expected) @@ -305,7 +297,7 @@ def strip_prefix(self, name, prefix): async def interrupt_me(self): """Throw an exception after waiting. Used to break out of gather()""" - await asyncio.sleep(70) + await asyncio.sleep(15) logging.info("About to interrupt all tasks") raise RuntimeError("I'm interrupting") diff --git a/tests/test_fullasync.py b/tests/test_fullasync.py index cec8ae0..82a4f6e 100644 --- a/tests/test_fullasync.py +++ b/tests/test_fullasync.py @@ -113,7 +113,7 @@ async def testAsyncIngest(self): self.assertEqual(len(files), 1) # create a FileIngester - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) # start all the ingester tasks task_list = ingester.run_tasks() diff --git a/tests/test_gen3.py b/tests/test_gen3.py index cd93a7b..253a39d 100644 --- a/tests/test_gen3.py +++ b/tests/test_gen3.py @@ -120,7 +120,7 @@ async def testAuxTelIngest(self): self.assertEqual(len(files), 1) # create a FileIngester - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) staged_files = ingester.stageFiles([self.destFile]) await ingester.ingest(staged_files) @@ -148,7 +148,7 @@ async def testComCamIngest(self): # create the file ingester, get all tasks associated with it, and # create the tasks - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) clean_tasks = ingester.getButlerCleanTasks() task_list = [] @@ -220,7 +220,7 @@ async def testCleanTask(self): # create the file ingester, get all tasks associated with it, and # create the tasks - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) # check to see that the file is there before ingestion self.assertTrue(os.path.exists(self.destFile)) @@ -292,11 +292,14 @@ async def testBadIngest(self): files = scanner.getAllFiles() self.assertEqual(len(files), 1) - ingester = FileIngester(config["ingester"]) + ingester = FileIngester(config) staged_files = ingester.stageFiles([self.destFile]) - await ingester.ingest(staged_files) + print(f"{staged_files=}") + print(f"{ingester=}") + await ingester.ingest(staged_files) + await asyncio.sleep(0) # appease coverage files = scanner.getAllFiles() self.assertEqual(len(files), 0) @@ -309,9 +312,9 @@ async def testRepoExists(self): config = self.createConfig("ingest_comcam_gen3.yaml") self.destFile = self.placeFitsFile(self.subDir, fits_name) - FileIngester(config["ingester"]) + FileIngester(config) # tests the path that the previously created repo (above) exists - FileIngester(config["ingester"]) + FileIngester(config) async def interrupt_me(self): await asyncio.sleep(10) diff --git a/tests/test_msg.py b/tests/test_msg.py new file mode 100644 index 0000000..892d655 --- /dev/null +++ b/tests/test_msg.py @@ -0,0 +1,133 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import os +import shutil +import tempfile +import time +import unittest +from unittest.mock import MagicMock + +import lsst.utils.tests +import yaml +from lsst.ctrl.oods.bucketMessage import BucketMessage +from lsst.ctrl.oods.msgIngester import MsgIngester +from lsst.ctrl.oods.msgQueue import MsgQueue + + +class S3AuxtelIngesterTestCase(unittest.IsolatedAsyncioTestCase): + """Test S3 Butler Ingest""" + + def createConfig(self, config_name): + """create a standard configuration file, using temporary directories + + Parameters + ---------- + config_name: `str` + name of the OODS configuration file + fits_name: `str` + name of the test FITS file + + Returns + ------- + config: `dict` + An OODS configuration to use for testing + """ + + # create a path to the configuration file + + testdir = os.path.abspath(os.path.dirname(__file__)) + configFile = os.path.join(testdir, "etc", config_name) + + # load the YAML configuration + + with open(configFile, "r") as f: + config = yaml.safe_load(f) + + self.repoDir = tempfile.mkdtemp() + config["repoDirectory"] = self.repoDir + + return config + + def tearDown(self): + shutil.rmtree(self.repoDir, ignore_errors=True) + + def returnVal(self, num_messages, timeout): + if self.attempts == 0: + self.attempts += 1 + return [self.fakeKafkaMessage] + time.sleep(1) + return [] + + async def testAuxTelIngest(self): + """test ingesting an auxtel file""" + test_dir = os.path.abspath(os.path.dirname(__file__)) + msg_file = os.path.join(test_dir, "data", "kafka_msg.json") + with open(msg_file, "r") as f: + message = f.read() + + fits_name = "2020032700020-det000.fits.fz" + file_url = f'file://{os.path.join(test_dir, "data", fits_name)}' + + fits_name = "bad.fits.fz" + bad_file_url = f'file://{os.path.join(test_dir, "data", fits_name)}' + + self.attempts = 0 + self.fakeKafkaMessage = MagicMock() + self.fakeKafkaMessage.value = MagicMock(return_value=message) + + MsgQueue.createConsumer = MagicMock() + MsgQueue.consumer = MagicMock() + MsgQueue.consumer.consume = MagicMock(side_effect=self.returnVal) + + BucketMessage.extract_urls = MagicMock(return_value=[file_url, bad_file_url]) + + config = self.createConfig("ingest_auxtel_s3.yaml") + + # create a MsgIngester + ingester = MsgIngester(config, None) + + task_list = ingester.run_tasks() + # add one more task, whose sole purpose is to interrupt the others by + # throwing an acception + task_list.append(asyncio.create_task(self.interrupt_me())) + + # gather all the tasks, until one (the "interrupt_me" task) + # throws an exception + try: + await asyncio.gather(*task_list) + except Exception: + ingester.stop_tasks() + for task in task_list: + task.cancel() + + async def interrupt_me(self): + await asyncio.sleep(5) + raise Exception("I'm interrupting") + + +class MemoryTester(lsst.utils.tests.MemoryTestCase): + pass + + +def setup_module(module): + lsst.utils.tests.init() diff --git a/tests/test_msgqueue.py b/tests/test_msgqueue.py new file mode 100644 index 0000000..b5c478a --- /dev/null +++ b/tests/test_msgqueue.py @@ -0,0 +1,75 @@ +# This file is part of ctrl_oods +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import asyncio +import os +import unittest +from unittest.mock import MagicMock, patch + +import lsst.utils.tests +from lsst.ctrl.oods.msgQueue import MsgQueue + + +class MsgQueueTestCase(unittest.IsolatedAsyncioTestCase): + + @patch.object(MsgQueue, "createConsumer", return_value=None) + async def testMsgQueue(self, MockClass1): + + brokers = ["test_broker"] + group_id = "test_group" + topics = "test_topic" + max_messages = 4 + + testdir = os.path.abspath(os.path.dirname(__file__)) + + dataFile = os.path.join(testdir, "data", "kafka_msg.json") + + with open(dataFile, "r") as f: + message = f.read() + + mq = MsgQueue(brokers, group_id, topics, max_messages) + mq.consumer = MagicMock() + mq.consumer.consume = MagicMock(return_value=[message]) + mq.consumer.commit = MagicMock() + mq.consumer.close = MagicMock() + + task_list = [] + task_list.append(asyncio.create_task(mq.queue_files())) + task_list.append(asyncio.create_task(self.interrupt_me())) + msg = await mq.dequeue_messages() + self.assertEqual(len(msg), 1) + + try: + await asyncio.gather(*task_list) + except Exception: + for task in task_list: + task.cancel() + + async def interrupt_me(self): + await asyncio.sleep(5) + raise RuntimeError("I'm interrupting") + + +class MemoryTester(lsst.utils.tests.MemoryTestCase): + pass + + +def setup_module(module): + lsst.utils.tests.init() diff --git a/tests/test_multi.py b/tests/test_multi.py index db84d65..050e67f 100644 --- a/tests/test_multi.py +++ b/tests/test_multi.py @@ -87,7 +87,7 @@ async def testComCamIngest(self): files = scanner.getAllFiles() self.assertEqual(len(files), 1) - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) staged_files = ingester.stageFiles([destFile]) await ingester.ingest(staged_files) diff --git a/tests/test_tagging.py b/tests/test_tagging.py index 242dc87..5fb96d0 100644 --- a/tests/test_tagging.py +++ b/tests/test_tagging.py @@ -140,7 +140,7 @@ async def stage(self): # create the file ingester, get all tasks associated with it, and # create the tasks - ingester = FileIngester(ingesterConfig) + ingester = FileIngester(config) butler_tasks = ingester.getButlerCleanTasks() task_list = [] @@ -188,13 +188,13 @@ async def testTaggedFileTestCase(self): exposure = "3019053000001" file_to_ingest, task_list = await self.stage() - # add an extra task, which runs after ingestion - task_list.append(asyncio.create_task(self.associate_file(exposure))) - task_list.append(asyncio.create_task(self.check_file(file_to_ingest))) - task_list.append(asyncio.create_task(self.disassociate_file(exposure))) - task_list.append(asyncio.create_task(self.check_file(file_to_ingest, wait=50, exists=False))) + await self.associate_file(exposure) + self.check_file(file_to_ingest) + await self.disassociate_file(exposure) + await asyncio.sleep(40) + self.check_file(file_to_ingest, exists=False) - # kick off all the tasks, until one (the "interrupt_me" task) + # wait until the "interrupt_me" task # throws an exception try: await asyncio.gather(*task_list) @@ -202,7 +202,7 @@ async def testTaggedFileTestCase(self): for task in task_list: task.cancel() - async def check_file(self, filename, wait=25, exists=True): + def check_file(self, filename, exists=True): """Check that the existance of a file Parameters @@ -216,7 +216,6 @@ async def check_file(self, filename, wait=25, exists=True): If False, file is checked that it doesn't exist """ - await asyncio.sleep(wait) if exists: self.assertTrue(os.path.exists(filename)) logging.info("file was there, as expected") @@ -275,17 +274,13 @@ async def disassociate_file(self, exposure): logging.info("about to disassociate file") butler = Butler(self.repoDir, writeable=True) - # get the dataset - try: - results = set( - butler.registry.queryDatasets( - datasetType=..., - collections=self.collections, - where=f"exposure={exposure} and instrument='LSSTComCam'", - ) + results = set( + butler.registry.queryDatasets( + datasetType=..., + collections=self.collections, + where=f"exposure={exposure} and instrument='LSSTComCam'", ) - except Exception as e: - logging.info(e) + ) # should just be one... self.assertEqual(len(results), 1) @@ -315,7 +310,7 @@ def strip_prefix(self, name, prefix): async def interrupt_me(self): """Throw an exception after waiting. Used to break out of gather()""" - await asyncio.sleep(70) + await asyncio.sleep(30) logging.info("About to interrupt all tasks") raise RuntimeError("I'm interrupting")