Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace os.walk() with os.scandir() #53

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions python/lsst/ctrl/oods/cacheCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import time

from lsst.ctrl.oods.scanner import Scanner
from lsst.ctrl.oods.timeInterval import TimeInterval

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,14 +140,15 @@ async def getFilesOlderThan(self, seconds, directory):
"""
files = []

for dirName, subdirs, fileList in os.walk(directory):
for fname in fileList:
await asyncio.sleep(0)
fullName = os.path.join(dirName, fname)
stat_info = os.stat(fullName)
modification_time = stat_info.st_mtime
if modification_time < seconds:
files.append(fullName)
scanner = Scanner()
async for entry in scanner.scan(directory):
if entry.is_dir():
continue
fullName = entry.path
srp3rd marked this conversation as resolved.
Show resolved Hide resolved
stat_info = os.stat(fullName)
modification_time = stat_info.st_mtime
if modification_time < seconds:
files.append(fullName)
return files

async def clearEmptyDirectories(self, seconds, directories):
Expand Down Expand Up @@ -185,14 +187,15 @@ async def getDirectoriesOlderThan(self, seconds, directory):
"""
directories = []

for root, dirs, files in os.walk(directory, topdown=False):
for name in dirs:
await asyncio.sleep(0)
full_name = os.path.join(root, name)
stat_info = os.stat(full_name)
mtime = stat_info.st_mtime
if mtime < seconds:
directories.append(full_name)
scanner = Scanner()
async for entry in scanner.scan(directory):
if entry.is_file():
continue
full_name = entry.path
stat_info = os.stat(full_name)
mtime = stat_info.st_mtime
srp3rd marked this conversation as resolved.
Show resolved Hide resolved
if mtime < seconds:
directories.append(full_name)
return directories

async def removeEmptyDirectories(self, directories):
Expand All @@ -205,9 +208,15 @@ async def removeEmptyDirectories(self, directories):
"""
for directory in sorted(directories, reverse=True):
await asyncio.sleep(0)
if not os.listdir(directory):
if self._isEmpty(directory):
LOGGER.info("removing %s", directory)
try:
os.rmdir(directory)
except Exception as e:
LOGGER.info("Couldn't remove %s: %s", directory, e)

def _isEmpty(self, directory):
with os.scandir(directory) as entries:
for entry in entries:
srp3rd marked this conversation as resolved.
Show resolved Hide resolved
return False
return True
13 changes: 6 additions & 7 deletions python/lsst/ctrl/oods/directoryScanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
import os
from lsst.ctrl.oods.scanner import Scanner


class DirectoryScanner(object):
Expand Down Expand Up @@ -66,10 +65,10 @@ async def getFiles(self, directory):
files: `list`
list of all files in the given directory
"""
scanner = Scanner()
files = []
for dirName, subdirs, fileList in os.walk(directory):
for fname in fileList:
await asyncio.sleep(0)
fullName = os.path.join(dirName, fname)
files.append(fullName)
async for entry in scanner.scan(directory):
if entry.is_dir():
continue
files.append(entry.path)
return files
44 changes: 38 additions & 6 deletions python/lsst/ctrl/oods/gen3ButlerIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ def undef_metadata(self, filename):
info["SENSOR"] = "S??"
return info

def transmit_status(self, metadata, code, description):
async def print_msg(self, msg):
"""Print message dictionary - used if a CSC has not been created"""
srp3rd marked this conversation as resolved.
Show resolved Hide resolved

LOGGER.info(f"would have sent {msg=}")
await asyncio.sleep(0)

async def transmit_status(self, metadata, code, description):
"""Transmit a message with given metadata, status code and description

Parameters
Expand All @@ -162,10 +168,14 @@ def transmit_status(self, metadata, code, description):
msg["DESCRIPTION"] = description
LOGGER.info("msg: %s, code: %s, description: %s", msg, code, description)
if self.csc is None:
await self.print_msg(msg)
return
asyncio.run(self.csc.send_imageInOODS(msg))
await self.csc.send_imageInOODS(msg)

def on_success(self, datasets):
asyncio.create_task(self._on_success(datasets))

async def _on_success(self, datasets):
"""Callback used on successful ingest. Used to transmit
successful data ingestion status

Expand All @@ -175,12 +185,16 @@ def on_success(self, datasets):
list of DatasetRefs
"""
for dataset in datasets:
await asyncio.sleep(0)
LOGGER.info("file %s successfully ingested", dataset.path.ospath)
image_data = ImageData(dataset)
LOGGER.debug("image_data.get_info() = %s", image_data.get_info())
self.transmit_status(image_data.get_info(), code=0, description="file ingested")
await self.transmit_status(image_data.get_info(), code=0, description="file ingested")

def on_ingest_failure(self, exposures, exc):
asyncio.create_task(self._on_ingest_failure(exposures, exc))

async def _on_ingest_failure(self, exposures, exc):
"""Callback used on ingest failure. Used to transmit
unsuccessful data ingestion status

Expand All @@ -193,11 +207,12 @@ def on_ingest_failure(self, exposures, exc):

"""
for f in exposures.files:
await asyncio.sleep(0)
real_file = f.filename.ospath
self.move_file_to_bad_dir(real_file)
cause = self.extract_cause(exc)
info = self.rawexposure_info(f)
self.transmit_status(info, code=1, description=f"ingest failure: {cause}")
await self.transmit_status(info, code=1, description=f"ingest failure: {cause}")

def on_metadata_failure(self, filename, exc):
"""Callback used on metadata extraction failure. Used to transmit
Expand All @@ -215,7 +230,7 @@ def on_metadata_failure(self, filename, exc):

cause = self.extract_cause(exc)
info = self.undef_metadata(real_file)
self.transmit_status(info, code=2, description=f"metadata failure: {cause}")
asyncio.create_task(self.transmit_status(info, code=2, description=f"metadata failure: {cause}"))

def move_file_to_bad_dir(self, filename):
bad_dir = self.create_bad_dirname(self.bad_file_dir, self.staging_dir, filename)
Expand All @@ -234,6 +249,7 @@ async def ingest(self, file_list):
"""

# Ingest image.
await asyncio.sleep(0)
self.task.run(file_list)

def getName(self):
Expand All @@ -251,7 +267,7 @@ async def clean_task(self):
seconds = TimeInterval.calculateTotalSeconds(self.scanInterval)
while True:
if self.csc:
self.csc.log.info("butler repo cleaning started")
self.csc.log.info("calling butler repo clean started")

await self.clean()

Expand All @@ -264,6 +280,7 @@ async def clean(self):
were ingested before the configured Interval
"""

await asyncio.sleep(0)
# calculate the time value which is Time.now - the
# "olderThan" configuration
t = Time.now()
Expand All @@ -273,10 +290,15 @@ async def clean(self):
)
t = t - td

LOGGER.info("createButler()")
srp3rd marked this conversation as resolved.
Show resolved Hide resolved
butler = self.createButler()
await asyncio.sleep(0)

LOGGER.info("refresh()")
butler.registry.refresh()
await asyncio.sleep(0)

LOGGER.info("about to run queryDatasets")
# get all datasets in these collections
allCollections = self.collections if self.cleanCollections is None else self.cleanCollections
all_datasets = set(
Expand All @@ -287,21 +309,29 @@ async def clean(self):
bind={"ref_date": t},
)
)
LOGGER.info("done running queryDatasets")
await asyncio.sleep(0)

LOGGER.info("about to run queryCollections")
# get all TAGGED collections
tagged_cols = list(butler.registry.queryCollections(collectionTypes=CollectionType.TAGGED))
LOGGER.info("done running queryCollections")

await asyncio.sleep(0)
# Note: The code below is to get around an issue where passing
# an empty list as the collections argument to queryDatasets
# returns all datasets.
if tagged_cols:
# get all TAGGED datasets
LOGGER.info("about to run queryDatasets for TAGGED collections")
tagged_datasets = set(butler.registry.queryDatasets(datasetType=..., collections=tagged_cols))
LOGGER.info("done running queryDatasets for TAGGED collections; differencing datasets")
await asyncio.sleep(0)

# get a set of datasets in all_datasets, but not in tagged_datasets
ref = all_datasets.difference(tagged_datasets)
LOGGER.info("done differencing datasets")
await asyncio.sleep(0)
else:
# no TAGGED collections, so use all_datasets
ref = all_datasets
Expand Down Expand Up @@ -329,4 +359,6 @@ async def clean(self):
LOGGER.info("error removing %s: %s", uri, e)

await asyncio.sleep(0)
LOGGER.info("about to run pruneDatasets")
butler.pruneDatasets(ref, purge=True, unstore=True)
LOGGER.info("done running pruneDatasets")
34 changes: 34 additions & 0 deletions python/lsst/ctrl/oods/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is part of ctrl_oods
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
import os


class Scanner(object):
Copy link

@mxk62 mxk62 Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add both the class and the method docstrings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


async def scan(self, directory):
await asyncio.sleep(0)
for entry in os.scandir(directory):
await asyncio.sleep(0)
yield entry
if entry.is_dir(follow_symlinks=False):
async for sub_entry in self.scan(entry.path):
yield sub_entry
srp3rd marked this conversation as resolved.
Show resolved Hide resolved
Loading