Skip to content

Commit

Permalink
Merge branch 'tickets/DM-38130'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Jun 20, 2024
2 parents b9480be + 9899c04 commit b7bde8b
Show file tree
Hide file tree
Showing 39 changed files with 1,309 additions and 364 deletions.
2 changes: 1 addition & 1 deletion bin.src/atoods
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ lsstlog.usePythonLogging()

LOGGER = logging.getLogger(__name__)
F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S")
logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S")

asyncio.run(AtOodsCsc.amain(index=None))
2 changes: 1 addition & 1 deletion bin.src/ccoods
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ lsstlog.usePythonLogging()

LOGGER = logging.getLogger(__name__)
F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S")
logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S")

asyncio.run(CcOodsCsc.amain(index=None))
2 changes: 1 addition & 1 deletion bin.src/mtoods
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ lsstlog.usePythonLogging()

LOGGER = logging.getLogger(__name__)
F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
logging.basicConfig(level=logging.INFO, format=(F), datefmt="%Y-%m-%d %H:%M:%S")
logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S")

asyncio.run(MtOodsCsc.amain(index=None))
109 changes: 109 additions & 0 deletions bin.src/standalone
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python
# This file is part of ctrl_oods
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import asyncio
import logging
import os

import lsst.log as lsstlog
import yaml
from lsst.ctrl.oods.msgIngester import MsgIngester

LOGGER = logging.getLogger(__name__)
F = "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
logging.basicConfig(level=logging.INFO, format=F, datefmt="%Y-%m-%d %H:%M:%S")


class Standalone(object):
"""Standalone class to run tests without CSC requirements."""

def __init__(self):
self.config = None
# import YAML file here
if "CTRL_OODS_CONFIG_FILE" in os.environ:
filename = os.environ["CTRL_OODS_CONFIG_FILE"]
LOGGER.info("using configuration %s", filename)
with open(filename, "r") as f:
self.config = yaml.safe_load(f)
else:
raise FileNotFoundError("CTRL_OODS_CONFIG_FILE is not set")

self.task_list = None

self.ingester_config = self.config["ingester"]

async def send_imageInOODS(self, info):
"""Send SAL message that the images has been ingested into the OODS
Parameters
----------
info : `dict`
information about the image
"""
camera = info["CAMERA"]
obsid = info["OBSID"]
raft = "undef"
if "RAFT" in info:
raft = info["RAFT"]
sensor = "undef"
if "SENSOR" in info:
sensor = info["SENSOR"]
status_code = info["STATUS_CODE"]
description = info["DESCRIPTION"]

s = f"would send camera={camera} obsid={obsid} raft={raft} sensor={sensor} "
s = s + f"statusCode={status_code}, description={description}"
LOGGER.info(s)

async def start_services(self):
"""Start all cleanup and archiving services"""

# self added here, and by the time it's utilized by MsgIngester
# the CSC will be up and running
self.ingester = MsgIngester(self.config, self)

self.task_list = self.ingester.run_tasks()

async def stop_services(self):
"""Stop all cleanup and archiving services"""
print("calling stop_tasks")
self.ingester.stop_tasks()

async def done(self):
print("waiting...")
while True:
await asyncio.sleep(3600)
print("done!")

async def main(self):
await self.start_services()
group = asyncio.gather(self.done())
print("gathering")
await group
print("finished")
await self.stop_services()
print("complete")


if __name__ == "__main__":
alone = Standalone()
asyncio.run(alone.main())
30 changes: 30 additions & 0 deletions etc/msg_oods.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defaultInterval: &interval
days: 0
hours: 0
minutes: 0
seconds: 0

ingester:
kafka:
brokers:
- kafka:9092
topics:
- atoods
group_id: ATOODS
max_messages: 10
butlers:
- butler:
instrument: lsst.obs.lsst.Latiss
class:
import : lsst.ctrl.oods.messageAttendant
name : MessageAttendant
repoDirectory : /tmp/repo/LATISS
collections:
- LATISS/raw/all
scanInterval:
<<: *interval
seconds: 10
filesOlderThan:
<<: *interval
seconds: 30
batchSize: 20
20 changes: 12 additions & 8 deletions etc/oods_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ defaultInterval: &interval
seconds: 0

ingester:
FILE_INGEST_REQUEST: CC_FILE_INGEST_REQUEST
CONSUME_QUEUE: cc_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_cc
forwarderStagingDirectory: forwarder_staging
butlers:
- butler:
collections:
- LATISS/raw/all
class:
import : lsst.ctrl.oods.gen2ButlerIngester
name : Gen2ButlerIngester
import : lsst.ctrl.oods.fileAttendant
name : FileAttendant
stagingDirectory : staging
repoDirectory : repo
badFileDirectory: bad
scanInterval:
<<: *interval
minutes: 10
filesOlderThan:
<<: *interval
days: 90
- butler:
collections:
- LATISS/raw/all
instrument: lsst.obs.lsst.LsstComCam
class:
import : lsst.ctrl.oods.gen3ButlerIngester
name : Gen3ButlerIngester
import : lsst.ctrl.oods.fileAttendant
name : FileAttendant
stagingDirectory : staging2
repoDirectory : repo2
badFileDirectory: bad2
Expand Down
34 changes: 34 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,37 @@ target-version = ["py38"]
[tool.isort]
profile = "black"
line_length = 110

[tool.ruff]
exclude = [
"__init__.py",
"bin",
"doc",
"version.py",
"tests/.tests",
]
ignore = [
"E226",
"E228",
"N802",
"N803",
"N806",
"N999",
]
line-length = 110
select = [
"E", # pycodestyle
"F", # pyflakes
"N", # pep8-naming
"W", # pycodestyle
]
target-version = "py310"
extend-select = [
"RUF100", # Warn about unused noqa
]

[tool.ruff.pycodestyle]
max-doc-length = 79

[tool.ruff.pydocstyle]
convention = "numpy"
60 changes: 60 additions & 0 deletions python/lsst/ctrl/oods/bucketMessage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This file is part of ctrl_oods
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import json
import logging

LOGGER = logging.getLogger(__name__)


class BucketMessage(object):
"""Report on new messages
Parameters
----------
message: `str`
json string
"""

def __init__(self, message):
self.message = message

def extract_urls(self):
"""Extract object IDs from an S3 notification.
If one record is invalid, an error is logged but the function tries to
process the remaining records.
Yields
------
oid : `str`
The filename referred to by each message.
"""
msg = json.loads(self.message)
for record in msg["Records"]:
try:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
url = f"s3://{bucket_name}/{key}"
yield url
except KeyError as e:
LOGGER.error(f"Invalid msg: Couldn't find key in {record=}")
raise e
Loading

0 comments on commit b7bde8b

Please sign in to comment.