Skip to content

Commit

Permalink
Merge branch 'tickets/DM-45861' into legacy
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Aug 26, 2024
2 parents 6ee4e4f + 75223a6 commit e087bd4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 30 deletions.
43 changes: 26 additions & 17 deletions python/lsst/ctrl/oods/cacheCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import time

from lsst.ctrl.oods.scanner import Scanner
from lsst.ctrl.oods.timeInterval import TimeInterval

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,14 +140,15 @@ async def getFilesOlderThan(self, seconds, directory):
"""
files = []

for dirName, subdirs, fileList in os.walk(directory):
for fname in fileList:
await asyncio.sleep(0)
fullName = os.path.join(dirName, fname)
stat_info = os.stat(fullName)
modification_time = stat_info.st_mtime
if modification_time < seconds:
files.append(fullName)
scanner = Scanner()
async for entry in scanner.scan(directory):
if entry.is_dir():
continue
full_name = entry.path
stat_info = os.stat(full_name)
modification_time = stat_info.st_mtime
if modification_time < seconds:
files.append(full_name)
return files

async def clearEmptyDirectories(self, seconds, directories):
Expand Down Expand Up @@ -185,14 +187,15 @@ async def getDirectoriesOlderThan(self, seconds, directory):
"""
directories = []

for root, dirs, files in os.walk(directory, topdown=False):
for name in dirs:
await asyncio.sleep(0)
full_name = os.path.join(root, name)
stat_info = os.stat(full_name)
mtime = stat_info.st_mtime
if mtime < seconds:
directories.append(full_name)
scanner = Scanner()
async for entry in scanner.scan(directory):
if entry.is_file():
continue
full_name = entry.path
stat_info = os.stat(full_name)
modification_time = stat_info.st_mtime
if modification_time < seconds:
directories.append(full_name)
return directories

async def removeEmptyDirectories(self, directories):
Expand All @@ -205,9 +208,15 @@ async def removeEmptyDirectories(self, directories):
"""
for directory in sorted(directories, reverse=True):
await asyncio.sleep(0)
if not os.listdir(directory):
if self._isEmpty(directory):
LOGGER.info("removing %s", directory)
try:
os.rmdir(directory)
except Exception as e:
LOGGER.info("Couldn't remove %s: %s", directory, e)

def _isEmpty(self, directory):
with os.scandir(directory) as entries:
for entry in entries:
return False
return True
13 changes: 6 additions & 7 deletions python/lsst/ctrl/oods/directoryScanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
import os
from lsst.ctrl.oods.scanner import Scanner


class DirectoryScanner(object):
Expand Down Expand Up @@ -66,10 +65,10 @@ async def getFiles(self, directory):
files: `list`
list of all files in the given directory
"""
scanner = Scanner()
files = []
for dirName, subdirs, fileList in os.walk(directory):
for fname in fileList:
await asyncio.sleep(0)
fullName = os.path.join(dirName, fname)
files.append(fullName)
async for entry in scanner.scan(directory):
if entry.is_dir():
continue
files.append(entry.path)
return files
50 changes: 44 additions & 6 deletions python/lsst/ctrl/oods/gen3ButlerIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,19 @@ def undef_metadata(self, filename):
info["SENSOR"] = "S??"
return info

def transmit_status(self, metadata, code, description):
async def print_msg(self, msg):
"""Print message dictionary - used if a CSC has not been created
Parameters
----------
msg: `dict`
Dictionary to print
"""

LOGGER.info(f"would have sent {msg=}")
await asyncio.sleep(0)

async def transmit_status(self, metadata, code, description):
"""Transmit a message with given metadata, status code and description
Parameters
Expand All @@ -162,10 +174,14 @@ def transmit_status(self, metadata, code, description):
msg["DESCRIPTION"] = description
LOGGER.info("msg: %s, code: %s, description: %s", msg, code, description)
if self.csc is None:
await self.print_msg(msg)
return
asyncio.run(self.csc.send_imageInOODS(msg))
await self.csc.send_imageInOODS(msg)

def on_success(self, datasets):
asyncio.create_task(self._on_success(datasets))

async def _on_success(self, datasets):
"""Callback used on successful ingest. Used to transmit
successful data ingestion status
Expand All @@ -175,12 +191,16 @@ def on_success(self, datasets):
list of DatasetRefs
"""
for dataset in datasets:
await asyncio.sleep(0)
LOGGER.info("file %s successfully ingested", dataset.path.ospath)
image_data = ImageData(dataset)
LOGGER.debug("image_data.get_info() = %s", image_data.get_info())
self.transmit_status(image_data.get_info(), code=0, description="file ingested")
await self.transmit_status(image_data.get_info(), code=0, description="file ingested")

def on_ingest_failure(self, exposures, exc):
asyncio.create_task(self._on_ingest_failure(exposures, exc))

async def _on_ingest_failure(self, exposures, exc):
"""Callback used on ingest failure. Used to transmit
unsuccessful data ingestion status
Expand All @@ -193,11 +213,12 @@ def on_ingest_failure(self, exposures, exc):
"""
for f in exposures.files:
await asyncio.sleep(0)
real_file = f.filename.ospath
self.move_file_to_bad_dir(real_file)
cause = self.extract_cause(exc)
info = self.rawexposure_info(f)
self.transmit_status(info, code=1, description=f"ingest failure: {cause}")
await self.transmit_status(info, code=1, description=f"ingest failure: {cause}")

def on_metadata_failure(self, filename, exc):
"""Callback used on metadata extraction failure. Used to transmit
Expand All @@ -215,7 +236,7 @@ def on_metadata_failure(self, filename, exc):

cause = self.extract_cause(exc)
info = self.undef_metadata(real_file)
self.transmit_status(info, code=2, description=f"metadata failure: {cause}")
asyncio.create_task(self.transmit_status(info, code=2, description=f"metadata failure: {cause}"))

def move_file_to_bad_dir(self, filename):
bad_dir = self.create_bad_dirname(self.bad_file_dir, self.staging_dir, filename)
Expand All @@ -234,6 +255,7 @@ async def ingest(self, file_list):
"""

# Ingest image.
await asyncio.sleep(0)
self.task.run(file_list)

def getName(self):
Expand All @@ -251,7 +273,7 @@ async def clean_task(self):
seconds = TimeInterval.calculateTotalSeconds(self.scanInterval)
while True:
if self.csc:
self.csc.log.info("butler repo cleaning started")
self.csc.log.info("calling butler repo clean started")

await self.clean()

Expand All @@ -264,6 +286,7 @@ async def clean(self):
were ingested before the configured Interval
"""

await asyncio.sleep(0)
# calculate the time value which is Time.now - the
# "olderThan" configuration
t = Time.now()
Expand All @@ -273,10 +296,15 @@ async def clean(self):
)
t = t - td

LOGGER.info("about to createButler()")
butler = self.createButler()
await asyncio.sleep(0)

LOGGER.info("about to refresh()")
butler.registry.refresh()
await asyncio.sleep(0)

LOGGER.info("about to run queryDatasets")
# get all datasets in these collections
allCollections = self.collections if self.cleanCollections is None else self.cleanCollections
all_datasets = set(
Expand All @@ -287,21 +315,29 @@ async def clean(self):
bind={"ref_date": t},
)
)
LOGGER.info("done running queryDatasets")
await asyncio.sleep(0)

LOGGER.info("about to run queryCollections")
# get all TAGGED collections
tagged_cols = list(butler.registry.queryCollections(collectionTypes=CollectionType.TAGGED))
LOGGER.info("done running queryCollections")

await asyncio.sleep(0)
# Note: The code below is to get around an issue where passing
# an empty list as the collections argument to queryDatasets
# returns all datasets.
if tagged_cols:
# get all TAGGED datasets
LOGGER.info("about to run queryDatasets for TAGGED collections")
tagged_datasets = set(butler.registry.queryDatasets(datasetType=..., collections=tagged_cols))
LOGGER.info("done running queryDatasets for TAGGED collections; differencing datasets")
await asyncio.sleep(0)

# get a set of datasets in all_datasets, but not in tagged_datasets
ref = all_datasets.difference(tagged_datasets)
LOGGER.info("done differencing datasets")
await asyncio.sleep(0)
else:
# no TAGGED collections, so use all_datasets
ref = all_datasets
Expand Down Expand Up @@ -329,4 +365,6 @@ async def clean(self):
LOGGER.info("error removing %s: %s", uri, e)

await asyncio.sleep(0)
LOGGER.info("about to run pruneDatasets")
butler.pruneDatasets(ref, purge=True, unstore=True)
LOGGER.info("done running pruneDatasets")
46 changes: 46 additions & 0 deletions python/lsst/ctrl/oods/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This file is part of ctrl_oods
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
import os


class Scanner(object):

async def scan(self, directory):
"""Return entries in a directory tree
Parameters
----------
directory: `str`
directory to scan
Returns
-------
entry: `DirEntry`
directory entry
"""
await asyncio.sleep(0)
for entry in os.scandir(directory):
await asyncio.sleep(0)
yield entry
if entry.is_dir(follow_symlinks=False):
async for sub_entry in self.scan(entry.path):
yield sub_entry

0 comments on commit e087bd4

Please sign in to comment.