Skip to content

Commit

Permalink
Merge pull request #157 from pnuu/feature-sigterm-geographic-gatherer
Browse files Browse the repository at this point in the history
Add SIGTERM handling to geographic gatherer
  • Loading branch information
mraspaud authored Nov 13, 2024
2 parents f032ca1 + e866900 commit 33c5786
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pytroll_collectors/geographic_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"""Geographic segment gathering."""

import logging
import signal
import time

from configparser import NoOptionError, ConfigParser
Expand Down Expand Up @@ -56,6 +57,8 @@ def __init__(self, opts):
self.triggers = []
self.return_status = 0

self._sigterm_caught = False

self._clean_config()
self._setup_publisher()
try:
Expand Down Expand Up @@ -103,8 +106,9 @@ def _setup_triggers(self):

def run(self):
"""Run granule triggers."""
signal.signal(signal.SIGTERM, self._handle_sigterm)
try:
while True:
while self._keep_running():
time.sleep(1)
for trigger in self.triggers:
if not trigger.is_alive():
Expand All @@ -119,9 +123,26 @@ def run(self):

return self.return_status

def _handle_sigterm(self, signum, frame):
logger.info("Caught SIGTERM, shutting down when all collections are finished.")
self._sigterm_caught = True

def _keep_running(self):
keep_running = True
if self._sigterm_caught:
keep_running = self._trigger_collectors_have_granules()
return keep_running

def _trigger_collectors_have_granules(self):
for t in self.triggers:
for c in t.collectors:
if c.granules:
return True
return False

def stop(self):
"""Stop the gatherer."""
logger.info('Ending publication the gathering of granules...')
logger.info('Ending the gathering of granules...')
for trigger in self.triggers:
trigger.stop()
self.publisher.stop()
Expand Down
86 changes: 86 additions & 0 deletions pytroll_collectors/tests/test_geographic_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,89 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle):
assert snd_msg.data == expected_msg.data
finally:
gatherer.stop()


def test_sigterm(tmp_config_file, tmp_config_parser):
"""Test that SIGTERM signal is handled."""
import os
import signal
import time
from multiprocessing import Process

from pytroll_collectors.geographic_gatherer import GeographicGatherer

with open(tmp_config_file, mode="w") as fp:
tmp_config_parser.write(fp)

opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345",
str(tmp_config_file)])
# We don't need the triggers here. They also interfere with completing the test (the test never exits)
with patch("pytroll_collectors.geographic_gatherer.TriggerFactory.create"):
gatherer = GeographicGatherer(opts)
proc = Process(target=gatherer.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0


def test_sigterm_with_collection(tmp_config_file, tmp_config_parser):
"""Test that SIGTERM signal is handled when there is collection ongoing."""
import os
import signal
import time
from multiprocessing import Process

from pytroll_collectors.geographic_gatherer import GeographicGatherer

with open(tmp_config_file, mode="w") as fp:
tmp_config_parser.write(fp)

opts = arg_parse(["-c", "posttroll_section", "-p", "40000", "-n", "false", "-i", "localhost:12345",
str(tmp_config_file)])
# Use a fake trigger that initially sets some granules and after a while clears them
with patch("pytroll_collectors.geographic_gatherer.PostTrollTrigger",
new=FakeTriggerWithGranules):
gatherer = GeographicGatherer(opts)
proc = Process(target=gatherer.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0


class FakeTriggerWithGranules:
"""Fake trigger class used in testing SIGTERM handling.
At creation, adds "foo" to collector granules. When is_alive() is called the second time, it clears the granules.
"""

def __init__(self, collectors, *args, **kwargs):
"""Initialize the trigger class."""
self.collectors = collectors
for col in self.collectors:
col.granules.append("foo")
self._args = args
self._kwargs = kwargs
self._counter = 0

def is_alive(self):
"""Return True for alive thread."""
if self._counter > 0:
# On the second call clear the granules
for col in self.collectors:
col.granules = []
self._counter += 1
return True

def start(self):
"""Start the trigger."""
pass

def stop(self):
"""Stop the trigger."""
pass

0 comments on commit 33c5786

Please sign in to comment.