Skip to content

Commit

Permalink
refactor!: add KafkaEventConsumer to public API and rename EventProdu…
Browse files Browse the repository at this point in the history
…cerKafka (#44)
  • Loading branch information
rgraber authored Sep 9, 2022
1 parent bb4262f commit 7800736
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Change Log
in this file. It adheres to the structure of https://keepachangelog.com/ ,
but in reStructuredText instead of Markdown (for ease of incorporation into
Sphinx documentation and the PyPI description).
This project adheres to Semantic Versioning (https://semver.org/).

.. There should always be an "Unreleased" section for changes pending release.
Expand All @@ -16,6 +16,15 @@ Unreleased

*

[0.7.0] - 2022-09-08
********************

Changed
=======

* **Breaking changes** ``EventProducerKafka`` is now ``KafkaEventProducer``
* KafkaEventConsumer is now part of the public API

[0.6.2] - 2022-09-08
********************

Expand Down
5 changes: 3 additions & 2 deletions edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
See ADR ``docs/decisions/0006-public-api-and-app-organization.rst`` for the reasoning.
"""

from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer

__version__ = '0.6.2'
__version__ = '0.7.0'
10 changes: 5 additions & 5 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
return key_serializer, value_serializer


class EventProducerKafka():
class KafkaEventProducer():
"""
API singleton for event production to Kafka.
Expand Down Expand Up @@ -221,7 +221,7 @@ def prepare_for_shutdown(self):
self.producer.flush(-1)


def poll_indefinitely(api_weakref: EventProducerKafka):
def poll_indefinitely(api_weakref: KafkaEventProducer):
"""
Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.
Expand All @@ -230,7 +230,7 @@ def poll_indefinitely(api_weakref: EventProducerKafka):
See ADR for more information:
https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0007-producer-polling.rst
"""
# The reason we hold a weakref to the whole EventProducerKafka and
# The reason we hold a weakref to the whole KafkaEventProducer and
# not directly to the Producer itself is that you just can't make
# a weakref to the latter (perhaps because it's a C object.)

Expand Down Expand Up @@ -267,7 +267,7 @@ def poll_indefinitely(api_weakref: EventProducerKafka):
# outbound-message queue and threads. The use of this cache allows the
# producer to be long-lived.
@lru_cache # will just be one cache entry, in practice
def get_producer() -> Optional[EventProducerKafka]:
def get_producer() -> Optional[KafkaEventProducer]:
"""
Create or retrieve Producer API singleton.
Expand All @@ -281,7 +281,7 @@ def get_producer() -> Optional[EventProducerKafka]:
if producer_settings is None:
return None

return EventProducerKafka(Producer(producer_settings))
return KafkaEventProducer(Producer(producer_settings))


def on_event_deliver(err, evt):
Expand Down
6 changes: 3 additions & 3 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_get_producer_configured(self):
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
):
assert isinstance(ep.get_producer(), ep.EventProducerKafka)
assert isinstance(ep.get_producer(), ep.KafkaEventProducer)

@patch('edx_event_bus_kafka.internal.producer.logger')
def test_on_event_deliver(self, mock_logger):
Expand Down Expand Up @@ -153,7 +153,7 @@ def increment_call_count(*args):
call_count += 1

mock_producer = Mock(**{'poll.side_effect': increment_call_count})
producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling
producer_api = ep.KafkaEventProducer(mock_producer) # Created, starts polling

# Allow a little time to pass and check that the mock poll has been called
time.sleep(1.0)
Expand Down Expand Up @@ -186,7 +186,7 @@ def increment_call_count(*args):
raise Exception("Exercise error handler on first iteration")

mock_producer = Mock(**{'poll.side_effect': increment_call_count})
producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling
producer_api = ep.KafkaEventProducer(mock_producer) # Created, starts polling

# Allow a little time to pass and check that the mock poll has been called
time.sleep(1.0)
Expand Down

0 comments on commit 7800736

Please sign in to comment.