diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a5da0c9..cbc5570 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +[0.6.1] - 2022-09-06 +******************** + +Added +===== + +* Producer now polls on an interval, improving callback reliability. Configurable with ``EVENT_BUS_KAFKA_POLL_INTERVAL_SEC``. + [0.6.0] - 2022-09-01 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index cb1260e..5730561 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -8,4 +8,4 @@ from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer -__version__ = '0.6.0' +__version__ = '0.6.1' diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index ba8084c..fb1aa11 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -6,9 +6,13 @@ import json import logging +import threading +import time +import weakref from functools import lru_cache from typing import Any, List, Optional +from django.conf import settings from django.dispatch import receiver from django.test.signals import setting_changed from openedx_events.event_bus.avro.serializer import AvroSignalSerializer @@ -167,6 +171,13 @@ class EventProducerKafka(): def __init__(self, producer): self.producer = producer + threading.Thread( + target=poll_indefinitely, + name="kafka-producer-poll", + args=(weakref.ref(self),), # allow GC but also thread auto-stop (important for tests!) + daemon=True, # don't block shutdown + ).start() + def send( self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, ) -> None: @@ -192,13 +203,11 @@ def send( ) # Opportunistically ensure any pending callbacks from recent event-sends are triggered. - # - # This assumes events come regularly, or that we're not concerned about - # high latency between delivery and callback. If those assumptions are - # false, we should switch to calling poll(1.0) or similar in a loop on - # a separate thread. Or do both. - # - # Issue: https://github.com/openedx/event-bus-kafka/issues/31 + # This ensures that we're polling at least as often as we're producing, which is a + # reasonable balance. However, if events are infrequent, it doesn't ensure that + # callbacks happen in a timely fashion, and the last event emitted before shutdown + # would never get a delivery callback. That's why there's also a thread calling + # poll(0) on a regular interval (see `poll_indefinitely`). self.producer.poll(0) def prepare_for_shutdown(self): @@ -210,6 +219,49 @@ def prepare_for_shutdown(self): self.producer.flush(-1) +def poll_indefinitely(api_weakref: EventProducerKafka): + """ + Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered. + + The thread stops automatically once the producer is garbage-collected. + + This ensures that callbacks are triggered in a timely fashion, rather than waiting + for the poll() call that we make before or after each produce() call. This may be + important if events are produced infrequently, and it allows the last event the + server emits before shutdown to have its callback run (if it happens soon enough.) + """ + # The reason we hold a weakref to the whole EventProducerKafka and + # not directly to the Producer itself is that you just can't make + # a weakref to the latter (perhaps because it's a C object.) + + # .. setting_name: EVENT_BUS_KAFKA_POLL_INTERVAL_SEC + # .. setting_default: 1.0 + # .. setting_description: How frequently to poll the event-bus-kafka producer. This should + # be small enough that there's not too much latency in triggering delivery callbacks once + # a message has been acknowledged, but there's no point in setting it any lower than the + # expected round-trip-time of message delivery and acknowledgement. (100 ms – 5 s is + # probably a reasonable range.) + poll_interval_seconds = getattr(settings, 'EVENT_BUS_KAFKA_POLL_INTERVAL_SEC', 1.0) + while True: + time.sleep(poll_interval_seconds) + + # Temporarily hold a strong ref to the producer API singleton + api_object = api_weakref() + if api_object is None: + return + + try: + api_object.producer.poll(0) + except BaseException: + # If polling is failing, we'll almost certainly find out about it from the poll call + # we make when producing an event. The call in this loop could be excessively noisy, + # so just debug-log it. + logger.debug("Event bus producer polling loop encountered exception (continuing)", exc_info=True) + finally: + # Get rid of that strong ref again + api_object = None + + # Note: This caching is required, since otherwise the Producer will # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the diff --git a/edx_event_bus_kafka/internal/tests/test_config.py b/edx_event_bus_kafka/internal/tests/test_config.py index c12df29..a78ee55 100644 --- a/edx_event_bus_kafka/internal/tests/test_config.py +++ b/edx_event_bus_kafka/internal/tests/test_config.py @@ -39,20 +39,20 @@ def test_unconfigured(self): def test_minimal(self): with override_settings( - EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', ): assert config.load_common_settings() == { - 'bootstrap.servers': 'http://localhost:54321', + 'bootstrap.servers': 'localhost:54321', } def test_full(self): with override_settings( - EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', ): assert config.load_common_settings() == { - 'bootstrap.servers': 'http://localhost:54321', + 'bootstrap.servers': 'localhost:54321', 'sasl.mechanism': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': 'some_other_key', diff --git a/edx_event_bus_kafka/internal/tests/test_producer.py b/edx_event_bus_kafka/internal/tests/test_producer.py index 1bce56e..43062f6 100644 --- a/edx_event_bus_kafka/internal/tests/test_producer.py +++ b/edx_event_bus_kafka/internal/tests/test_producer.py @@ -2,9 +2,11 @@ Test the event producer code. """ +import gc +import time import warnings from unittest import TestCase -from unittest.mock import MagicMock, patch +from unittest.mock import Mock, patch import openedx_events.learning.signals import pytest @@ -87,7 +89,7 @@ def test_get_producer_configured(self): EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret', - EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', # include these just to maximize code coverage EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', @@ -96,7 +98,7 @@ def test_get_producer_configured(self): @patch('edx_event_bus_kafka.internal.producer.logger') def test_on_event_deliver(self, mock_logger): - fake_event = MagicMock() + fake_event = Mock() fake_event.topic.return_value = 'some_topic' fake_event.key.return_value = 'some_key' fake_event.partition.return_value = 'some_partition' @@ -121,7 +123,7 @@ def test_on_event_deliver(self, mock_logger): def test_send_to_event_bus(self, mock_get_serializers): with override_settings( EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', - EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', ): producer_api = ep.get_producer() with patch.object(producer_api, 'producer', autospec=True) as mock_producer: @@ -137,3 +139,55 @@ def test_send_to_event_bus(self, mock_get_serializers): on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, ) + + @override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05) + def test_polling_loop_terminates(self): + """ + Test that polling loop stops as soon as the producer is garbage-collected. + """ + call_count = 0 + + def increment_call_count(*args): + nonlocal call_count + call_count += 1 + + mock_producer = Mock(**{'poll.side_effect': increment_call_count}) + producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling + + # Allow a little time to pass and check that the mock poll has been called + time.sleep(1.0) + assert call_count >= 3 # some small value; would actually be about 20 + print(producer_api) # Use the value here to ensure it isn't GC'd early + + # Allow garbage collection of these objects, then ask for it to happen. + producer_api = None + mock_producer = None + gc.collect() + + time.sleep(0.2) # small multiple of loop iteration time + count_after_gc = call_count + + # Wait a little longer and confirm that the count is no longer rising + time.sleep(1.0) + assert call_count == count_after_gc + + @override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05) + def test_polling_loop_robust(self): + """ + Test that polling loop continues even if one call raises an exception. + """ + call_count = 0 + + def increment_call_count(*args): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise Exception("Exercise error handler on first iteration") + + mock_producer = Mock(**{'poll.side_effect': increment_call_count}) + producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling + + # Allow a little time to pass and check that the mock poll has been called + time.sleep(1.0) + assert call_count >= 3 # some small value; would actually be about 20 + print(producer_api) # Use the value here to ensure it isn't GC'd early