Skip to content

Commit

Permalink
feat: Poll producer on a regular cadence to avoid delayed callbacks (#39
Browse files Browse the repository at this point in the history
)
  • Loading branch information
timmc-edx authored Sep 6, 2022
1 parent 26d9c4f commit 2807fe6
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 16 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

[0.6.1] - 2022-09-06
********************

Added
=====

* Producer now polls on an interval, improving callback reliability. Configurable with ``EVENT_BUS_KAFKA_POLL_INTERVAL_SEC``.

[0.6.0] - 2022-09-01
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer

__version__ = '0.6.0'
__version__ = '0.6.1'
66 changes: 59 additions & 7 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

import json
import logging
import threading
import time
import weakref
from functools import lru_cache
from typing import Any, List, Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
Expand Down Expand Up @@ -167,6 +171,13 @@ class EventProducerKafka():
def __init__(self, producer):
self.producer = producer

threading.Thread(
target=poll_indefinitely,
name="kafka-producer-poll",
args=(weakref.ref(self),), # allow GC but also thread auto-stop (important for tests!)
daemon=True, # don't block shutdown
).start()

def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
Expand All @@ -192,13 +203,11 @@ def send(
)

# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
#
# This assumes events come regularly, or that we're not concerned about
# high latency between delivery and callback. If those assumptions are
# false, we should switch to calling poll(1.0) or similar in a loop on
# a separate thread. Or do both.
#
# Issue: https://github.com/openedx/event-bus-kafka/issues/31
# This ensures that we're polling at least as often as we're producing, which is a
# reasonable balance. However, if events are infrequent, it doesn't ensure that
# callbacks happen in a timely fashion, and the last event emitted before shutdown
# would never get a delivery callback. That's why there's also a thread calling
# poll(0) on a regular interval (see `poll_indefinitely`).
self.producer.poll(0)

def prepare_for_shutdown(self):
Expand All @@ -210,6 +219,49 @@ def prepare_for_shutdown(self):
self.producer.flush(-1)


def poll_indefinitely(api_weakref: EventProducerKafka):
"""
Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.
The thread stops automatically once the producer is garbage-collected.
This ensures that callbacks are triggered in a timely fashion, rather than waiting
for the poll() call that we make before or after each produce() call. This may be
important if events are produced infrequently, and it allows the last event the
server emits before shutdown to have its callback run (if it happens soon enough.)
"""
# The reason we hold a weakref to the whole EventProducerKafka and
# not directly to the Producer itself is that you just can't make
# a weakref to the latter (perhaps because it's a C object.)

# .. setting_name: EVENT_BUS_KAFKA_POLL_INTERVAL_SEC
# .. setting_default: 1.0
# .. setting_description: How frequently to poll the event-bus-kafka producer. This should
# be small enough that there's not too much latency in triggering delivery callbacks once
# a message has been acknowledged, but there's no point in setting it any lower than the
# expected round-trip-time of message delivery and acknowledgement. (100 ms – 5 s is
# probably a reasonable range.)
poll_interval_seconds = getattr(settings, 'EVENT_BUS_KAFKA_POLL_INTERVAL_SEC', 1.0)
while True:
time.sleep(poll_interval_seconds)

# Temporarily hold a strong ref to the producer API singleton
api_object = api_weakref()
if api_object is None:
return

try:
api_object.producer.poll(0)
except BaseException:
# If polling is failing, we'll almost certainly find out about it from the poll call
# we make when producing an event. The call in this loop could be excessively noisy,
# so just debug-log it.
logger.debug("Event bus producer polling loop encountered exception (continuing)", exc_info=True)
finally:
# Get rid of that strong ref again
api_object = None


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
Expand Down
8 changes: 4 additions & 4 deletions edx_event_bus_kafka/internal/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ def test_unconfigured(self):

def test_minimal(self):
with override_settings(
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
):
assert config.load_common_settings() == {
'bootstrap.servers': 'http://localhost:54321',
'bootstrap.servers': 'localhost:54321',
}

def test_full(self):
with override_settings(
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
):
assert config.load_common_settings() == {
'bootstrap.servers': 'http://localhost:54321',
'bootstrap.servers': 'localhost:54321',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': 'some_other_key',
Expand Down
62 changes: 58 additions & 4 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
Test the event producer code.
"""

import gc
import time
import warnings
from unittest import TestCase
from unittest.mock import MagicMock, patch
from unittest.mock import Mock, patch

import openedx_events.learning.signals
import pytest
Expand Down Expand Up @@ -87,7 +89,7 @@ def test_get_producer_configured(self):
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
# include these just to maximize code coverage
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
Expand All @@ -96,7 +98,7 @@ def test_get_producer_configured(self):

@patch('edx_event_bus_kafka.internal.producer.logger')
def test_on_event_deliver(self, mock_logger):
fake_event = MagicMock()
fake_event = Mock()
fake_event.topic.return_value = 'some_topic'
fake_event.key.return_value = 'some_key'
fake_event.partition.return_value = 'some_partition'
Expand All @@ -121,7 +123,7 @@ def test_on_event_deliver(self, mock_logger):
def test_send_to_event_bus(self, mock_get_serializers):
with override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
):
producer_api = ep.get_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
Expand All @@ -137,3 +139,55 @@ def test_send_to_event_bus(self, mock_get_serializers):
on_delivery=ep.on_event_deliver,
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'},
)

@override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05)
def test_polling_loop_terminates(self):
"""
Test that polling loop stops as soon as the producer is garbage-collected.
"""
call_count = 0

def increment_call_count(*args):
nonlocal call_count
call_count += 1

mock_producer = Mock(**{'poll.side_effect': increment_call_count})
producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling

# Allow a little time to pass and check that the mock poll has been called
time.sleep(1.0)
assert call_count >= 3 # some small value; would actually be about 20
print(producer_api) # Use the value here to ensure it isn't GC'd early

# Allow garbage collection of these objects, then ask for it to happen.
producer_api = None
mock_producer = None
gc.collect()

time.sleep(0.2) # small multiple of loop iteration time
count_after_gc = call_count

# Wait a little longer and confirm that the count is no longer rising
time.sleep(1.0)
assert call_count == count_after_gc

@override_settings(EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.05)
def test_polling_loop_robust(self):
"""
Test that polling loop continues even if one call raises an exception.
"""
call_count = 0

def increment_call_count(*args):
nonlocal call_count
call_count += 1
if call_count == 1:
raise Exception("Exercise error handler on first iteration")

mock_producer = Mock(**{'poll.side_effect': increment_call_count})
producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling

# Allow a little time to pass and check that the mock poll has been called
time.sleep(1.0)
assert call_count >= 3 # some small value; would actually be about 20
print(producer_api) # Use the value here to ensure it isn't GC'd early

0 comments on commit 2807fe6

Please sign in to comment.