Skip to content

Commit

Permalink
Capture Nextflow logs via syslog rather than console
Browse files Browse the repository at this point in the history
  • Loading branch information
nwiltsie committed Jan 17, 2024
1 parent 8e1180e commit 33ab94b
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed
- Make `nftest run` exit with the number of failed tests
- Use `shell=False` for subprocess
- Capture Nextflow logs via syslog rather than console

### Fixed
- Make `nftest` with no arguments print usage and exit
Expand Down
116 changes: 75 additions & 41 deletions nftest/NFTestCase.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
""" NF Test case """
from __future__ import annotations

import logging
import os
import re
import shlex
import shutil
import subprocess as sp
import threading

from logging import getLogger
from contextlib import ExitStack
from pathlib import Path
from shlex import quote
from typing import Callable, List, TYPE_CHECKING, Tuple

from nftest.common import remove_nextflow_logs, popen_with_logger
from nftest.NFTestENV import NFTestENV
from nftest.syslog import SyslogServer


if TYPE_CHECKING:
from nftest.NFTestGlobal import NFTestGlobal
from nftest.NFTestAssert import NFTestAssert


class NFTestCase():
""" Defines the NF test case """
# pylint: disable=R0902
Expand All @@ -32,8 +36,8 @@ def __init__(self, name:str=None, message:str=None, nf_script:str=None,
skip:bool=False, verbose:bool=False):
""" Constructor """
self._env = NFTestENV()
self._logger = getLogger('NFTest')
self._nflogger = getLogger("NextFlow")
self._logger = logging.getLogger('NFTest')
self._nflogger = logging.getLogger("console")
self.name = name
self.name_for_output = re.sub(r'[^a-zA-Z0-9_\-.]', '', self.name.replace(' ', '-'))
self.message = message
Expand Down Expand Up @@ -83,10 +87,12 @@ def test(self) -> bool:
if self.skip:
self._logger.info(' [ skipped ]')
return True
res = self.submit()
if res.returncode != 0:

nextflow_succeeded = self.submit()
if not nextflow_succeeded:
self._logger.error(' [ failed ]')
return False

for ass in self.asserts:
try:
ass.identify_assertion_files()
Expand All @@ -102,41 +108,69 @@ def test(self) -> bool:

def submit(self) -> sp.CompletedProcess:
""" Submit a nextflow run """
nextflow_command = ["nextflow", "run", self.nf_script]

if self.profiles:
nextflow_command.extend(["-profile", ",".join(self.profiles)])

for config in self.nf_configs:
nextflow_command.extend(["-c", config])

if self.params_file:
nextflow_command.extend(["-params-file", self.params_file])

for param_name, path in self.reference_params:
nextflow_command.extend([f"--{param_name}", path])

nextflow_command.extend([
f"--{self.output_directory_param_name}",
Path(self._env.NFT_OUTPUT, self.name_for_output)
])

envmod = {
"NXF_WORK": self.temp_dir
}

# Log the shell equivalent of this command
self._logger.info(
"%s %s",
" ".join([f"{k}={quote(v)}" for k, v in envmod.items()]),
sp.list2cmdline(nextflow_command)
)

process = popen_with_logger(
nextflow_command,
env={**os.environ, **envmod},
logger=self._logger
)
# Use ExitStack to handle the multiple nested context managers
with ExitStack() as stack:
# Create a server with a random port to accept syslogs from
# Nextflow
syslog_server = stack.enter_context(SyslogServer.make_server())

# Shut down the server when we exit this context manager
stack.callback(syslog_server.shutdown)

# Run the server in a thread. This thread will die once
# serve_forever returns, which will happen after a call to
# syslog_server.shutdown()
threading.Thread(
name="SyslogThread",
target=syslog_server.serve_forever,
kwargs={"poll_interval": 1}
).start()

syslog_address = ":".join(
str(item) for item in syslog_server.server_address
)

nextflow_command = [
"nextflow",
"-quiet",
"-syslog", syslog_address,
"run",
self.nf_script
]

if self.profiles:
nextflow_command.extend(["-profile", ",".join(self.profiles)])

for config in self.nf_configs:
nextflow_command.extend(["-c", config])

if self.params_file:
nextflow_command.extend(["-params-file", self.params_file])

for param_name, path in self.reference_params:
nextflow_command.extend([f"--{param_name}", path])

nextflow_command.extend([
f"--{self.output_directory_param_name}",
Path(self._env.NFT_OUTPUT, self.name_for_output)
])

envmod = {
"NXF_WORK": self.temp_dir
}

# Log the shell equivalent of this command
self._logger.info(
"%s %s",
" ".join([f"{k}={shlex.quote(v)}" for k, v in envmod.items()]),
sp.list2cmdline(nextflow_command)
)

process = popen_with_logger(
nextflow_command,
env={**os.environ, **envmod},
logger=self._nflogger
)

return process

Expand Down
21 changes: 20 additions & 1 deletion nftest/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from nftest import __version__
from nftest.NFTestENV import NFTestENV
from nftest.syslog import syslog_filter

# pylint: disable=W0613
def validate_yaml(path:Path):
Expand Down Expand Up @@ -114,9 +115,27 @@ def setup_loggers():
except ValueError:
stream_handler.setLevel(logging.INFO)

# Set up a special filter to decode the syslog messages from Nextflow
logging.getLogger("nextflow").addFilter(syslog_filter)

# This is a little sneaky: hide any tracebacks emitted via Nextflow's
# syslog handler from the console. They will still be recorded in the log
# file.
stream_handler.addFilter(
lambda record: record.threadName != "traceback"
)

# Set up different formats for the stream and file handlers
# The key difference is that the file handler will contain the thread name
stream_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
))
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(threadName)s - %(levelname)s - %(message)s',
))

logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=(file_handler, stream_handler)
)

Expand Down
104 changes: 104 additions & 0 deletions nftest/syslog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Module for a system to capture Nextflow logs via syslog.
"""
import logging
import re
import socketserver
import subprocess


LEVELS = [
logging.CRITICAL, # 0 = Emergency
logging.CRITICAL, # 1 = Alert
logging.CRITICAL, # 2 = Critical
logging.ERROR, # 3 = Error
logging.WARNING, # 4 = Warning
logging.INFO, # 5 = Notice
logging.INFO, # 6 = Info
logging.DEBUG, # 7 = Debug
]

PRIORITY_RE = re.compile(r"^<(\d+)>")
MESSAGE_RE = re.compile(r"^nextflow:\s+\w+\s+\[(?P<thread>.+?)\] \S+ - ")


def syslog_filter(record):
"""
Logging filter to update syslogs from Nextflow with embedded information.
"""
# Nextflow uses BSD-style syslog messages
# https://datatracker.ietf.org/doc/html/rfc3164
# Format / example:
# <PRI>Mmm dd hh:mm:ss HOSTNAME MESSAGE"
# <134>Jan 3 12:00:25 ip-0A125232 nextflow: INFO [main] more...
level_month, day, time, addr_message = \
record.msg.strip().decode("utf-8").split(maxsplit=3)

# For errors, the message may begin with whitespace - make sure to only
# strip off syslog's required space
_, message = addr_message.split(" ", maxsplit=1)

# The priority is 8 * facility + level - we only care about the level
record.levelno = LEVELS[
int(PRIORITY_RE.match(level_month).group(1)) % 8
]
record.levelname = logging.getLevelName(record.levelno)

# For most messages, nextflow seems to have a format of:
# nextflow: LEVEL [THREAD] MODULE - MESSAGE
#
# nextflow: DEBUG [main] ConfigBuilder - User config file...
# nextflow: INFO [main] DefaultPluginStatusProvider - Enabled...
# nextflow: ERROR [main] Launcher - Unable...

# Strip off everything before the module if possible
thread_match = MESSAGE_RE.match(message)
if thread_match:
record.msg = MESSAGE_RE.sub("", message)
record.threadName = thread_match.group("thread")
else:
record.msg = message
record.threadName = "traceback"

return record


class SyslogServer(socketserver.ThreadingUDPServer):
"A UDPServer that logs itself starting up and shutting down."
@classmethod
def make_server(cls):
"Create a server with a random port to handle syslogs."
docker_gateway = subprocess.check_output([
"docker",
"network",
"inspect",
"bridge",
"--format",
"{{ (index .IPAM.Config 0).Gateway }}"
]).decode("utf-8").strip()

return cls(
server_address=(docker_gateway, 0),
RequestHandlerClass=SyslogHandler
)

def serve_forever(self, poll_interval):
logging.getLogger(__name__).debug(
"Syslog server at %s:%d starting up",
*self.server_address
)
return super().serve_forever(poll_interval)

def shutdown(self):
logging.getLogger(__name__).debug(
"Syslog server at %s:%d shutting down",
*self.server_address
)
return super().shutdown()


class SyslogHandler(socketserver.BaseRequestHandler):
"A simple syslog-like server to capture formatted logs from Nextflow."
def handle(self):
# This is a syslog message from Nextflow
logging.getLogger("nextflow").info(self.request[0])

0 comments on commit 33ab94b

Please sign in to comment.