diff --git a/etc/msg_oods.yaml b/etc/msg_oods.yaml index be64e48..649e5e5 100644 --- a/etc/msg_oods.yaml +++ b/etc/msg_oods.yaml @@ -14,7 +14,7 @@ ingester: max_messages: 10 butlers: - butler: - instrument: lsst.obs.lsst.Latiss + instrument: lsst.obs.lsst.LsstCam class: import : lsst.ctrl.oods.messageAttendant name : MessageAttendant diff --git a/python/lsst/ctrl/oods/butlerAttendant.py b/python/lsst/ctrl/oods/butlerAttendant.py index d1c713d..f92e399 100644 --- a/python/lsst/ctrl/oods/butlerAttendant.py +++ b/python/lsst/ctrl/oods/butlerAttendant.py @@ -110,6 +110,8 @@ async def ingest(self, file_list): LOGGER.info("about to ingest") await loop.run_in_executor(executor, self.task.run, file_list) LOGGER.info("done with ingest") + except RuntimeError as re: + LOGGER.info(f"{re}") except Exception as e: LOGGER.exception(f"Exception! {e=}") @@ -195,7 +197,7 @@ def transmit_status(self, metadata, code, description): msg["MSG_TYPE"] = "IMAGE_IN_OODS" msg["STATUS_CODE"] = code msg["DESCRIPTION"] = description - LOGGER.info("msg: %s, code: %s, description: %s", msg, code, description) + LOGGER.debug("msg: %s, code: %s, description: %s", msg, code, description) if self.csc is None: self.print_msg(msg) return diff --git a/python/lsst/ctrl/oods/msgQueue.py b/python/lsst/ctrl/oods/msgQueue.py index df8e07f..db6a383 100644 --- a/python/lsst/ctrl/oods/msgQueue.py +++ b/python/lsst/ctrl/oods/msgQueue.py @@ -71,8 +71,8 @@ def __init__(self, brokers, group_id, topics, max_messages): use_auth = False if use_auth: - LOGGER.info("{MECHANISM_KEY} set to {mechanism}") - LOGGER.info("{PROTOCOL_KEY} set to {protocol}") + LOGGER.info(f"{MECHANISM_KEY} set to {mechanism}") + LOGGER.info(f"{PROTOCOL_KEY} set to {protocol}") config = { "bootstrap.servers": ",".join(self.brokers), "group.id": self.group_id, diff --git a/tests/data/kafka_msg2.json b/tests/data/kafka_msg2.json new file mode 100644 index 0000000..5e1db80 --- /dev/null +++ b/tests/data/kafka_msg2.json @@ -0,0 +1,44 @@ +{ + "Records": [ + { + "eventVersion": "2.2", + "eventSource": "ceph:s3", + "awsRegion": "s3-butler", + "eventTime": "2024-11-13T17:52:22.138912Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "lsstcam" + }, + "requestParameters": { + "sourceIPAddress": "" + }, + "responseElements": { + "x-amz-request-id": "1cad9458-45e6-48cf-ae5a-2bb195396da7.41369686.13031161180866775831", + "x-amz-id-2": "2774056-s3-butler-s3-butler" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "lsst.s3.raw.lsstcam", + "bucket": { + "name": "rubinobs-raw-lsstcam", + "ownerIdentity": { + "principalId": "lsstcam" + }, + "arn": "arn:aws:s3:s3-butler::rubinobs-raw-lsstcam", + "id": "1cad9458-45e6-48cf-ae5a-2bb195396da7.28735610.1" + }, + "object": { + "key": "test.txt", + "size": 0, + "eTag": "d41d8cd98f00b204e9800998ecf8427e", + "versionId": "", + "sequencer": "56E7346729097308", + "metadata": [], + "tags": [] + } + }, + "eventId": "1731520342.141756.d41d8cd98f00b204e9800998ecf8427e", + "opaqueData": "&mechanism=SCRAM-SHA-512" + } + ] +} \ No newline at end of file diff --git a/tests/test_bucket_message.py b/tests/test_bucket_message.py index b95d09b..8a246b2 100644 --- a/tests/test_bucket_message.py +++ b/tests/test_bucket_message.py @@ -47,7 +47,7 @@ def createBucketMessage(self, msg_file): bucket_message = BucketMessage(message) return bucket_message - def testBucketMessage(self): + def testBucketMessage1(self): """test that the message we're extracting is expected""" bucket_message = self.createBucketMessage("kafka_msg.json") url_list = list() @@ -58,6 +58,17 @@ def testBucketMessage(self): f = "s3://rubin-pp/HSC/73/2023061400090/0/6140090/HSC-Z/HSC-2023061400090-0-6140090-HSC-Z-73.fz" self.assertEqual(url_list[0], f) + def testBucketMessage2(self): + """test that the message we're extracting is expected""" + bucket_message = self.createBucketMessage("kafka_msg2.json") + url_list = list() + for url in bucket_message.extract_urls(): + url_list.append(url) + + self.assertEqual(len(url_list), 1) + f = "s3://rubinobs-raw-lsstcam/test.txt" + self.assertEqual(url_list[0], f) + def testBadBucketMessage(self): """test that a bad message throws an exception""" bucket_message = self.createBucketMessage("bad_kafka_msg.json")