Skip to content

Commit

Permalink
Merge branch 'tickets/DM-45156'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Nov 1, 2024
2 parents e3eb9fe + 8df5785 commit 3475eb6
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 18 deletions.
2 changes: 2 additions & 0 deletions python/lsst/ctrl/oods/bucketMessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ def extract_urls(self):
oid : `str`
The filename referred to by each message.
"""
LOGGER.debug(f"extracting from {self.message}")
msg = json.loads(self.message)
for record in msg["Records"]:
try:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
url = f"s3://{bucket_name}/{key}"
LOGGER.debug(f"yielding {url}")
yield url
except KeyError as e:
LOGGER.error(f"Invalid msg: Couldn't find key in {record=}")
Expand Down
21 changes: 21 additions & 0 deletions python/lsst/ctrl/oods/msgQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@
import asyncio
import concurrent
import logging
import os

from confluent_kafka import Consumer

LOGGER = logging.getLogger(__name__)

SECURITY_PROTOCOL = "SASL_PLAINTEXT"
SASL_MECHANISM = "SCRAM-SHA-512"

USERNAME_KEY = "LSST_KAFKA_SECURITY_USERNAME"
PASSWORD_KEY = "LSST_KAFKA_SECURITY_PASSWORD"
PROTOCOL_KEY = "LSST_KAFKA_SECURITY_PROTOCOL"
MECHANISM_KEY = "LSST_KAFKA_SECURITY_MECHANISM"


class MsgQueue(object):
"""Report on new messages
Expand All @@ -48,10 +57,19 @@ def __init__(self, brokers, group_id, topics, max_messages):
self.msgList = list()
self.condition = asyncio.Condition()

username = os.environ.get(USERNAME_KEY, "USERNAME_NOT_CONFIGURED")
password = os.environ.get(PASSWORD_KEY, "PASSWORD_NOT_CONFIGURED")
mechanism = os.environ.get(MECHANISM_KEY, SASL_MECHANISM)
protocol = os.environ.get(PROTOCOL_KEY, SECURITY_PROTOCOL)

config = {
"bootstrap.servers": ",".join(self.brokers),
"group.id": self.group_id,
"auto.offset.reset": "earliest",
"security.protocol": protocol,
"sasl.mechanism": mechanism,
"sasl.username": username,
"sasl.password": password,
}
# note: this is done because mocking a cimpl is...tricky
self.createConsumer(config, topics)
Expand All @@ -60,6 +78,7 @@ def createConsumer(self, config, topics):
"""Create a Kafka Consumer"""
self.consumer = Consumer(config)
self.consumer.subscribe(topics)
LOGGER.info("subscribed")

async def queue_files(self):
"""Queue all files in messages on the subscribed topics"""
Expand All @@ -77,6 +96,7 @@ async def queue_files(self):

def _get_messages(self):
"""Return up to max_messages at a time from Kafka"""
LOGGER.debug("getting more messages")
while self.running:
try:
m_list = self.consumer.consume(num_messages=self.max_messages, timeout=1.0)
Expand All @@ -85,6 +105,7 @@ def _get_messages(self):
raise e
if len(m_list) == 0:
continue
LOGGER.debug("message(s) received")
return m_list

async def dequeue_messages(self):
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/cc_oods_multi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ archiver:
name: "CCArchiver"

ingester:
FILE_INGEST_REQUEST: CC_FILE_INGEST_REQUEST
CONSUME_QUEUE: cc_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_cc
imageStagingDirectory: image_staging
butlers:
- butler:
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/clean_collections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ archiver:
name: "ATArchiver"

ingester:
FILE_INGEST_REQUEST: AT_FILE_INGEST_REQUEST
CONSUME_QUEUE: at_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_at
imageStagingDirectory: data
butlers:
- butler:
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/ingest_auxtel_clean.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ archiver:
name: "ATArchiver"

ingester:
FILE_INGEST_REQUEST: AT_FILE_INGEST_REQUEST
CONSUME_QUEUE: at_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_at
imageStagingDirectory: data
butlers:
- butler:
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/ingest_auxtel_gen3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ archiver:
name: "ATArchiver"

ingester:
FILE_INGEST_REQUEST: AT_FILE_INGEST_REQUEST
CONSUME_QUEUE: at_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_at
imageStagingDirectory: data
butlers:
- butler:
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/ingest_comcam_gen3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ archiver:
name: "CCArchiver"

ingester:
FILE_INGEST_REQUEST: CC_FILE_INGEST_REQUEST
CONSUME_QUEUE: cc_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_cc
imageStagingDirectory: data
butlers:
- butler:
Expand Down
3 changes: 0 additions & 3 deletions tests/etc/ingest_tag_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ defaultInterval: &interval
seconds: 0

ingester:
FILE_INGEST_REQUEST: CC_FILE_INGEST_REQUEST
CONSUME_QUEUE: cc_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_cc
imageStagingDirectory: data
butlers:
- butler:
Expand Down

0 comments on commit 3475eb6

Please sign in to comment.