Skip to content

Commit 78ccdfb

Browse files
committed
Use linger_ms to delay batches
Currently, the linger_ms parameter is used to limit the number of request sent to a given broker as adding at minimum linger_ms between to produce requests. It doesn't produce the expected result when the max size of batches is also reached. The purpose of linger_ms should be not limiting the rate of request, but instead keeping a batch still open long enough so it has the opportunity to grow bigger. Moving linger_ms to the message accumulator (like in the java client) gives the opportunity to have a condition both on the max size and the lingering time. fixes #1137
1 parent 7b7c4ff commit 78ccdfb

File tree

6 files changed

+57
-17
lines changed

6 files changed

+57
-17
lines changed

CHANGES.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ Bugfixes:
2525
* Ensure the transaction coordinator is refreshed after broker fail‑over,
2626
so transactional producers resume once a new coordinator is elected.
2727
(pr #1135 by @vmaurin)
28-
28+
* Properly manage batch max size and linger_ms. A batch will be always
29+
produced if the max size or the lingering time is reached
30+
(pr #1142 by @vmaurin)
2931

3032
Misc:
3133
* Use SPDX license expression for project metadata.

aiokafka/producer/message_accumulator.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,14 @@ def record_count(self):
129129

130130

131131
class MessageBatch:
132-
"""This class incapsulate operations with batch of produce messages"""
132+
"""This class encapsulate operations with batch of produce messages"""
133133

134-
def __init__(self, tp, builder, ttl):
134+
def __init__(self, tp, builder, ttl, linger_time, max_size):
135135
self._builder = builder
136136
self._tp = tp
137137
self._ttl = ttl
138+
self._linger_time = linger_time
139+
self._max_size = max_size
138140
self._ctime = time.monotonic()
139141

140142
# Waiters
@@ -266,6 +268,13 @@ async def wait_drain(self, timeout=None):
266268
if waiter.done():
267269
waiter.result() # Check for exception
268270

271+
def ready(self):
272+
"""Check that batch is ready or not"""
273+
return (
274+
self._builder.record_count() >= self._max_size
275+
or (time.monotonic() - self._ctime) >= self._linger_time
276+
)
277+
269278
def expired(self):
270279
"""Check that batch is expired or not"""
271280
return (time.monotonic() - self._ctime) > self._ttl
@@ -312,6 +321,7 @@ def __init__(
312321
*,
313322
txn_manager=None,
314323
loop=None,
324+
linger_ms=0,
315325
):
316326
if loop is None:
317327
loop = get_running_loop()
@@ -325,6 +335,7 @@ def __init__(
325335
self._wait_data_future = loop.create_future()
326336
self._closed = False
327337
self._txn_manager = txn_manager
338+
self._linger_time = linger_ms / 1000
328339

329340
self._exception = None # Critical exception
330341

@@ -466,6 +477,9 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=frozenset()):
466477
continue
467478
elif ignore_nodes and leader in ignore_nodes:
468479
continue
480+
elif not self._batches[tp][0].ready():
481+
# batch should still linger
482+
continue
469483

470484
batch = self._pop_batch(tp)
471485
# We can get an empty batch here if all `append()` calls failed
@@ -506,7 +520,9 @@ def _append_batch(self, builder, tp):
506520
if self._txn_manager is not None:
507521
self._txn_manager.maybe_add_partition_to_txn(tp)
508522

509-
batch = MessageBatch(tp, builder, self._batch_ttl)
523+
batch = MessageBatch(
524+
tp, builder, self._batch_ttl, self._linger_time, self._batch_size
525+
)
510526
self._batches[tp].append(batch)
511527
if not self._wait_data_future.done():
512528
self._wait_data_future.set_result(None)

aiokafka/producer/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,13 +309,13 @@ def __init__(
309309
self._request_timeout_ms / 1000,
310310
txn_manager=self._txn_manager,
311311
loop=loop,
312+
linger_ms=linger_ms,
312313
)
313314
self._sender = Sender(
314315
self.client,
315316
acks=acks,
316317
txn_manager=self._txn_manager,
317318
retry_backoff_ms=retry_backoff_ms,
318-
linger_ms=linger_ms,
319319
message_accumulator=self._message_accumulator,
320320
request_timeout_ms=request_timeout_ms,
321321
)

aiokafka/producer/sender.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import collections
33
import logging
4-
import time
54

65
import aiokafka.errors as Errors
76
from aiokafka.client import ConnectionGroup, CoordinationType
@@ -55,7 +54,6 @@ def __init__(
5554
txn_manager,
5655
message_accumulator,
5756
retry_backoff_ms,
58-
linger_ms,
5957
request_timeout_ms,
6058
):
6159
self.client = client
@@ -69,7 +67,6 @@ def __init__(
6967
self._coordinators = {}
7068
self._retry_backoff = retry_backoff_ms / 1000
7169
self._request_timeout_ms = request_timeout_ms
72-
self._linger_time = linger_ms / 1000
7370

7471
async def start(self):
7572
# If producer is idempotent we need to assure we have PID found
@@ -286,17 +283,9 @@ async def _send_produce_req(self, node_id, batches):
286283
node_id (int): kafka broker identifier
287284
batches (dict): dictionary of {TopicPartition: MessageBatch}
288285
"""
289-
t0 = time.monotonic()
290-
291286
handler = SendProduceReqHandler(self, batches)
292287
await handler.do(node_id)
293288

294-
# if batches for node is processed in less than a linger seconds
295-
# then waiting for the remaining time
296-
sleep_time = self._linger_time - (time.monotonic() - t0)
297-
if sleep_time > 0:
298-
await asyncio.sleep(sleep_time)
299-
300289
self._in_flight.remove(node_id)
301290
for tp in batches:
302291
self._muted_partitions.remove(tp)

tests/test_message_accumulator.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,40 @@ def mocked_leader_for_partition(tp):
102102
m_set1 = batches[1].get(tp1)
103103
self.assertEqual(m_set1._builder._relative_offset, 1)
104104

105+
@run_until_complete
106+
async def test_batch_ready(self):
107+
tp0 = TopicPartition("test-topic", 0)
108+
tp1 = TopicPartition("test-topic", 1)
109+
110+
def mocked_leader_for_partition(tp):
111+
if tp == tp0:
112+
return 0
113+
return None
114+
115+
cluster = ClusterMetadata(metadata_max_age_ms=10000)
116+
cluster.leader_for_partition = mock.MagicMock()
117+
cluster.leader_for_partition.side_effect = mocked_leader_for_partition
118+
119+
ma = MessageAccumulator(
120+
cluster, compression_type=0, batch_size=3, batch_ttl=10, linger_ms=1000
121+
)
122+
await ma.add_message(tp0, None, b"hello", timeout=2)
123+
await ma.add_message(tp1, None, b"hello", timeout=2)
124+
125+
batches, _ = ma.drain_by_nodes(ignore_nodes=[])
126+
# it should not be ready yet (linger time)
127+
self.assertEqual(len(batches), 0)
128+
await ma.add_message(tp0, None, b"hello", timeout=2)
129+
await ma.add_message(tp0, None, b"hello", timeout=2)
130+
batches, _ = ma.drain_by_nodes(ignore_nodes=[])
131+
# it should be ready (max size reached)
132+
self.assertEqual(len(batches), 1)
133+
self.assertEqual(batches[0][tp0].ready(), True)
134+
await asyncio.sleep(1)
135+
# it should be ready (linger time reached)
136+
self.assertEqual(len(batches), 1)
137+
self.assertEqual(batches[0][tp1].ready(), True)
138+
105139
@run_until_complete
106140
async def test_batch_done(self):
107141
tp0 = TopicPartition("test-topic", 0)

tests/test_sender.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ async def _setup_sender(self, no_init=False):
7373
txn_manager=tm,
7474
message_accumulator=ma,
7575
retry_backoff_ms=100,
76-
linger_ms=0,
7776
request_timeout_ms=40000,
7877
)
7978
self.add_cleanup(sender.close)

0 commit comments

Comments
 (0)