Skip to content

Commit 63e93af

Browse files
committed
feat: rebalance listener added to Consumer constructor
1 parent fb52ec2 commit 63e93af

File tree

8 files changed

+94
-17
lines changed

8 files changed

+94
-17
lines changed

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ target/
6161
# PyCharm
6262
.idea
6363

64+
# VSCode
65+
.vscode
66+
67+
# virtualenvs
68+
venv/
69+
.venv/
70+
6471
kafka_2*
6572

6673
tests/ssl_cert

CHANGES/842.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rebalance listener added to Consumer constructor

aiokafka/abc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import abc
2+
import typing
3+
24
from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
35

6+
RebalanceListenerCT = typing.TypeVar(
7+
"RebalanceListenerCT", bound="ConsumerRebalanceListener")
8+
49

510
class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
611
"""

aiokafka/consumer/consumer.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
import logging
33
import re
44
import sys
5+
from typing import Iterable, Optional, Pattern
56
import traceback
67
import warnings
78

89
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
910

10-
from aiokafka.abc import ConsumerRebalanceListener
11+
from aiokafka.abc import RebalanceListenerCT
1112
from aiokafka.client import AIOKafkaClient
1213
from aiokafka.errors import (
1314
TopicAuthorizationFailedError, OffsetOutOfRangeError,
@@ -133,6 +134,27 @@ class AIOKafkaConsumer:
133134
group member. If API methods block waiting for messages, that time
134135
does not count against this timeout. See `KIP-62`_ for more
135136
information. Default 300000
137+
listener (RebalanceListenerCT): Optionally include listener
138+
callback, which will be called before and after each rebalance
139+
operation.
140+
As part of group management, the consumer will keep track of
141+
the list of consumers that belong to a particular group and
142+
will trigger a rebalance operation if one of the following
143+
events trigger:
144+
145+
* Number of partitions change for any of the subscribed topics
146+
* Topic is created or deleted
147+
* An existing member of the consumer group dies
148+
* A new member is added to the consumer group
149+
150+
When any of these events are triggered, the provided listener
151+
will be invoked first to indicate that the consumer's
152+
assignment has been revoked, and then again when the new
153+
assignment has been received. Note that this listener will
154+
immediately override any listener set in a previous call
155+
to subscribe. It is guaranteed, however, that the partitions
156+
revoked/assigned
157+
through this interface are from topics subscribed in this call.
136158
rebalance_timeout_ms (int): The maximum time server will wait for this
137159
consumer to rejoin the group in a case of rebalance. In Java client
138160
this behaviour is bound to `max.poll.interval.ms` configuration,
@@ -242,6 +264,7 @@ def __init__(self, *topics, loop=None,
242264
metadata_max_age_ms=5 * 60 * 1000,
243265
partition_assignment_strategy=(RoundRobinPartitionAssignor,),
244266
max_poll_interval_ms=300000,
267+
listener: Optional[RebalanceListenerCT] = None,
245268
rebalance_timeout_ms=None,
246269
session_timeout_ms=10000,
247270
heartbeat_interval_ms=3000,
@@ -324,7 +347,7 @@ def __init__(self, *topics, loop=None,
324347
if topics:
325348
topics = self._validate_topics(topics)
326349
self._client.set_topics(topics)
327-
self._subscription.subscribe(topics=topics)
350+
self._subscription.subscribe(topics=topics, listener=listener)
328351

329352
def __del__(self, _warnings=warnings):
330353
if self._closed is False:
@@ -1008,7 +1031,12 @@ async def end_offsets(self, partitions):
10081031
partitions, self._request_timeout_ms)
10091032
return offsets
10101033

1011-
def subscribe(self, topics=(), pattern=None, listener=None):
1034+
def subscribe(
1035+
self,
1036+
topics: Iterable[str] = (),
1037+
pattern: Optional[Pattern] = None,
1038+
listener: Optional[RebalanceListenerCT] = None
1039+
) -> None:
10121040
""" Subscribe to a list of topics, or a topic regex pattern.
10131041
10141042
Partitions will be dynamically assigned via a group coordinator.
@@ -1021,7 +1049,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
10211049
topics (list): List of topics for subscription.
10221050
pattern (str): Pattern to match available topics. You must provide
10231051
either topics or pattern, but not both.
1024-
listener (ConsumerRebalanceListener): Optionally include listener
1052+
listener (RebalanceListenerCT): Optionally include listener
10251053
callback, which will be called before and after each rebalance
10261054
operation.
10271055
As part of group management, the consumer will keep track of
@@ -1046,18 +1074,14 @@ def subscribe(self, topics=(), pattern=None, listener=None):
10461074
IllegalStateError: if called after previously calling :meth:`assign`
10471075
ValueError: if neither topics or pattern is provided or both
10481076
are provided
1049-
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
10501077
"""
10511078
if not (topics or pattern):
10521079
raise ValueError(
10531080
"You should provide either `topics` or `pattern`")
10541081
if topics and pattern:
10551082
raise ValueError(
10561083
"You can't provide both `topics` and `pattern`")
1057-
if listener is not None and \
1058-
not isinstance(listener, ConsumerRebalanceListener):
1059-
raise TypeError(
1060-
"listener should be an instance of ConsumerRebalanceListener")
1084+
10611085
if pattern is not None:
10621086
try:
10631087
pattern = re.compile(pattern)

aiokafka/consumer/subscription_state.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
from asyncio import shield, Event, Future
66
from enum import Enum
77

8-
from typing import Dict, FrozenSet, Iterable, List, Pattern, Set
8+
from typing import Dict, FrozenSet, Iterable, List, Optional, Pattern, Set
99

1010
from aiokafka.errors import IllegalStateError
1111
from aiokafka.structs import OffsetAndMetadata, TopicPartition
12-
from aiokafka.abc import ConsumerRebalanceListener
12+
from aiokafka.abc import ConsumerRebalanceListener, RebalanceListenerCT
1313
from aiokafka.util import create_future, get_running_loop
1414

1515
log = logging.getLogger(__name__)
@@ -136,24 +136,50 @@ def _notify_assignment_waiters(self):
136136
waiter.set_result(None)
137137
self._assignment_waiters.clear()
138138

139+
def _validate_rebalance_listener(
140+
self,
141+
listener: Optional[RebalanceListenerCT] = None
142+
) -> None:
143+
""" Validates a ConsumerRebalanceListener.
144+
145+
Arguments:
146+
listener (RebalanceListenerCT): Optionally include listener
147+
callback, which will be called before and after each rebalance
148+
operation.
149+
Raises:
150+
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
151+
"""
152+
if listener is not None and \
153+
not isinstance(listener, ConsumerRebalanceListener):
154+
raise TypeError(
155+
"listener should be an instance of ConsumerRebalanceListener")
156+
139157
# Consumer callable API:
140158

141-
def subscribe(self, topics: Set[str], listener=None):
159+
def subscribe(
160+
self,
161+
topics: Set[str],
162+
listener: Optional[RebalanceListenerCT] = None
163+
) -> None:
142164
""" Subscribe to a list (or tuple) of topics
143165
144166
Caller: Consumer.
145167
Affects: SubscriptionState.subscription
146168
"""
147169
assert isinstance(topics, set)
148-
assert (listener is None or
149-
isinstance(listener, ConsumerRebalanceListener))
170+
171+
self._validate_rebalance_listener(listener=listener)
150172
self._set_subscription_type(SubscriptionType.AUTO_TOPICS)
151173

152174
self._change_subscription(Subscription(topics, loop=self._loop))
153175
self._listener = listener
154176
self._notify_subscription_waiters()
155177

156-
def subscribe_pattern(self, pattern: Pattern, listener=None):
178+
def subscribe_pattern(
179+
self,
180+
pattern: Pattern,
181+
listener: Optional[RebalanceListenerCT] = None
182+
) -> None:
157183
""" Subscribe to all topics matching a regex pattern.
158184
Subsequent calls `subscribe_from_pattern()` by Coordinator will provide
159185
the actual subscription topics.
@@ -162,8 +188,7 @@ def subscribe_pattern(self, pattern: Pattern, listener=None):
162188
Affects: SubscriptionState.subscribed_pattern
163189
"""
164190
assert hasattr(pattern, "match"), "Expected Pattern type"
165-
assert (listener is None or
166-
isinstance(listener, ConsumerRebalanceListener))
191+
self._validate_rebalance_listener(listener=listener)
167192
self._set_subscription_type(SubscriptionType.AUTO_PATTERN)
168193

169194
self._subscribed_pattern = pattern

docs/conf.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,14 @@ def get_version(release):
5757
'sphinx.ext.intersphinx',
5858
'sphinx.ext.viewcode',
5959
'sphinx.ext.napoleon',
60+
'sphinx_autodoc_typehints',
6061
'alabaster',
6162
]
6263

64+
65+
# Napoleon settings
66+
napoleon_use_param = False
67+
6368
try:
6469
import sphinxcontrib.spelling # noqa
6570
extensions.append('sphinxcontrib.spelling')

requirements-docs.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
Sphinx==4.3.0
33
sphinxcontrib-asyncio==0.3.0
44
sphinxcontrib-spelling==7.2.1
5+
sphinx-autodoc-typehints
56
alabaster==0.7.12
67
-e .

tests/test_consumer.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@ def test_create_consumer_no_running_loop(self):
132132
loop.run_until_complete(consumer.stop())
133133
loop.close()
134134

135+
@run_until_complete
136+
async def test_create_consumer_with_listener(self):
137+
listener = StubRebalanceListener()
138+
consumer = await self.consumer_factory(listener=listener)
139+
await consumer.stop()
140+
141+
with self.assertRaises(TypeError):
142+
await self.consumer_factory(listener=object())
143+
135144
@run_until_complete
136145
async def test_consumer_context_manager(self):
137146
await self.send_messages(0, list(range(0, 10)))

0 commit comments

Comments
 (0)