Skip to content

Commit dfbba18

Browse files
hakan458hakan458jombooth
authored
fix: DIA-1911: Gracefully handle large Kafka messages (#334)
Co-authored-by: hakan458 <[email protected]> Co-authored-by: Jo Booth <[email protected]>
1 parent 2a2772d commit dfbba18

File tree

4 files changed

+76
-59
lines changed

4 files changed

+76
-59
lines changed

adala/environments/kafka.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import json
55
import asyncio
66
import aiohttp
7+
import math
78
from csv import DictReader, DictWriter
89
from typing import Dict, Union, List, Optional, Iterable
910
from io import StringIO
@@ -95,15 +96,30 @@ async def save(self):
9596
async def message_sender(
9697
self, producer: AIOKafkaProducer, data: Iterable, topic: str
9798
):
98-
record_no = 0
99-
try:
100-
await producer.send_and_wait(topic, value=data)
101-
logger.info(
102-
f"The number of records sent to topic:{topic}, record_no:{len(data)}"
99+
100+
# To ensure we don't hit MessageSizeTooLargeErrors, split the data into chunks when sending
101+
# Add 10% to account for metadata sent in the message to be safe
102+
num_bytes = len(json.dumps(data).encode("utf-8")) * 1.10
103+
if num_bytes > producer._max_request_size:
104+
# Split into as many chunks as we need, limited by the length of `data`
105+
num_chunks = min(
106+
len(data), math.ceil(num_bytes / producer._max_request_size)
107+
)
108+
chunk_size = math.ceil(len(data) / num_chunks)
109+
110+
logger.warning(
111+
f"Message size of {num_bytes} is larger than max_request_size {producer._max_request_size} - splitting message into {num_chunks} chunks of size {chunk_size}"
103112
)
104-
finally:
105-
pass
106-
# print_text(f"No more messages for {topic=}")
113+
114+
for chunk_start in range(0, len(data), chunk_size):
115+
await producer.send_and_wait(topic, value=data[chunk_start : chunk_start + chunk_size])
116+
117+
# If the data is less than max_request_size, can send all at once
118+
else:
119+
await producer.send_and_wait(topic, value=data)
120+
logger.info(
121+
f"The number of records sent to topic:{topic}, record_no:{len(data)}"
122+
)
107123

108124
async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
109125
batch = await self.consumer.getmany(

docker-compose.native.yml

+2-6
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,5 @@ services:
2424
- KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk
2525
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
2626
- KAFKA_MESSAGE_MAX_BYTES=3000000
27-
redis:
28-
image: redis:alpine
29-
ports:
30-
- "6379:6379"
31-
healthcheck:
32-
test: [ "CMD", "redis-cli", "ping" ]
27+
- KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=3000000
28+
- KAFKA_CFG_REPLICA_FETCH_RESPONSE_MAX_BYTES=3000000

tests/cassettes/test_stream_inference/test_run_streaming.yaml

+46-42
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,21 @@ interactions:
1616
host:
1717
- api.openai.com
1818
user-agent:
19-
- OpenAI/Python 1.47.1
19+
- OpenAI/Python 1.60.2
2020
x-stainless-arch:
21-
- arm64
21+
- x64
2222
x-stainless-async:
2323
- 'false'
2424
x-stainless-lang:
2525
- python
2626
x-stainless-os:
27-
- MacOS
27+
- Linux
2828
x-stainless-package-version:
29-
- 1.47.1
29+
- 1.60.2
3030
x-stainless-raw-response:
3131
- 'true'
32+
x-stainless-retry-count:
33+
- '0'
3234
x-stainless-runtime:
3335
- CPython
3436
x-stainless-runtime-version:
@@ -38,34 +40,34 @@ interactions:
3840
response:
3941
body:
4042
string: !!binary |
41-
H4sIAAAAAAAAA4xSwWobMRS871e86pKLt+zaDja+hJJLA6GFQjGhlEWW3u7K0eoJ6S2pHQz9jf5e
42-
v6Ro7XgdmkIvAs28GWae9JwBCKPFCoRqJavO2/zDw+31vv9k9tu4LL+2xfr+7vN2v79t1l8etmKS
43-
FLTZouIX1XtFnbfIhtyRVgElY3ItF7PpopwVZTkQHWm0SdZ4zueUd8aZfFpM53mxyMvlSd2SURjF
44-
Cr5lAADPw5lyOo0/xAqKyQvSYYyyQbE6DwGIQDYhQsZoIkvHYjKSihyjG6LfXXWgybgGntDaCXAr
45-
3SPsqH8HH+kJ5IZ6TtcbWLeSf//8FYFcAgJ0xmlg0nJ3c2kesO6jTAVdb+0JP5zTWmp8oE088We8
46-
Ns7EtgooI7mULDJ5MbCHDOD7sJX+VVHhA3WeK6ZHdMmwnB/txPgWF+TyRDKxtCM+m07ecKs0sjQ2
47-
XmxVKKla1KNyfALZa0MXRHbR+e8wb3kfexvX/I/9SCiFnlFXPqA26nXhcSxg+qn/GjvveAgs4i4y
48-
dlVtXIPBB3P8J7WvikVxvamXC1WI7JD9AQAA//8DAN/P8VI1AwAA
43+
H4sIAAAAAAAAAwAAAP//jFJNi9swEL37V0x16SUuiZPgNJeF3ctuL2WhtJSyGEUa22pkjZDGzYYl
44+
/73I+XBCW+hFoPfmPd58vGUAwmixBqFayarzNr+f3u8fnvXq88PGffn+df74vCiWxWvNn/y2FJOk
45+
oM1PVHxWfVDUeYtsyB1pFVAyJtdZOf84Xy3LWTEQHWm0SdZ4zheUd8aZvJgWi3xa5rPVSd2SURjF
46+
Gn5kAABvw5tyOo2vYg3TyRnpMEbZoFhfigBEIJsQIWM0kaVjMRlJRY7RDdGf3negybgGdmjtBLiV
47+
bgt76t/BI+1Abqjn9L2Db61kUNLBE7RofQJhZ7gFJi33d9f+Aes+ytSj66094YdLYEuND7SJJ/6C
48+
18aZ2FYBZSSXwkUmLwb2kAG8DIPpb3oVPlDnuWLaokuGs8XRTozrGMniNDTBxNKO+PwsunGrNLI0
49+
Nl4NViipWtSjctyC7LWhKyK76vnPMH/zPvZtXPM/9iOhFHpGXfmA2qjbhseygOlY/1V2mfEQWEQM
50+
v4zCig2GtAeNtezt8YRE3EfGrqqNazD4YI53VPuqLFCXcrNcKJEdst8AAAD//wMAIRdmZFUDAAA=
4951
headers:
5052
CF-Cache-Status:
5153
- DYNAMIC
5254
CF-RAY:
53-
- 8e925b5dbbf74895-LIS
55+
- 910eb71c7e175c1e-SJC
5456
Connection:
5557
- keep-alive
5658
Content-Encoding:
5759
- gzip
5860
Content-Type:
5961
- application/json
6062
Date:
61-
- Wed, 27 Nov 2024 13:10:11 GMT
63+
- Wed, 12 Feb 2025 18:41:53 GMT
6264
Server:
6365
- cloudflare
6466
Set-Cookie:
65-
- __cf_bm=s_1bk.xRBNtMM7HEMNwZjDq6aB06ekamNvokE.5sIOs-1732713011-1.0.1.1-h8FdrFbPq7Nm6gTo49b_M2Pg8v9Tau3bA2cBNZ3R055F4uRSS_i7TluiYfbDyes4kJk8gUQw.LKz.rlG_JF2ww;
66-
path=/; expires=Wed, 27-Nov-24 13:40:11 GMT; domain=.api.openai.com; HttpOnly;
67+
- __cf_bm=Po.WjXogGx.Ldlk_Y_HxQg_GLnMrliWZbLOjEkIT2Bk-1739385713-1.0.1.1-0sOcUji7LCNELaOExFrFSTSrTEn63LA1LDsQavXa56vFg4PXl72xx7pot1rFasT58Jp0B0ccZKAL4hMYDp9Tcg;
68+
path=/; expires=Wed, 12-Feb-25 19:11:53 GMT; domain=.api.openai.com; HttpOnly;
6769
Secure; SameSite=None
68-
- _cfuvid=np_qT96suW_XXE9HjcKqg4LzjbaSpV0zCu3mhWZrOzU-1732713011428-0.0.1.1-604800000;
70+
- _cfuvid=12u3DQBCCOeTlAYfPWbwga5H00B1vEx4sM6yBuDP9LM-1739385713062-0.0.1.1-604800000;
6971
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
7072
Transfer-Encoding:
7173
- chunked
@@ -78,7 +80,7 @@ interactions:
7880
openai-organization:
7981
- heartex
8082
openai-processing-ms:
81-
- '435'
83+
- '575'
8284
openai-version:
8385
- '2020-10-01'
8486
strict-transport-security:
@@ -90,13 +92,13 @@ interactions:
9092
x-ratelimit-remaining-requests:
9193
- '29999'
9294
x-ratelimit-remaining-tokens:
93-
- '149999793'
95+
- '149999794'
9496
x-ratelimit-reset-requests:
9597
- 2ms
9698
x-ratelimit-reset-tokens:
9799
- 0s
98100
x-request-id:
99-
- req_2b086571666146a6f3cff4a9d6dfab0f
101+
- req_500c7cf981e50f56a800fb756e16958e
100102
status:
101103
code: 200
102104
message: OK
@@ -123,19 +125,21 @@ interactions:
123125
host:
124126
- api.openai.com
125127
user-agent:
126-
- AsyncOpenAI/Python 1.47.1
128+
- AsyncOpenAI/Python 1.60.2
127129
x-stainless-arch:
128-
- arm64
130+
- x64
129131
x-stainless-async:
130132
- async:asyncio
131133
x-stainless-lang:
132134
- python
133135
x-stainless-os:
134-
- MacOS
136+
- Linux
135137
x-stainless-package-version:
136-
- 1.47.1
138+
- 1.60.2
137139
x-stainless-raw-response:
138140
- 'true'
141+
x-stainless-retry-count:
142+
- '0'
139143
x-stainless-runtime:
140144
- CPython
141145
x-stainless-runtime-version:
@@ -145,35 +149,33 @@ interactions:
145149
response:
146150
body:
147151
string: !!binary |
148-
H4sIAAAAAAAAA4xTwYrbMBC9+yvEnOPiJBuS9a0sJUu7WZY2lG6bYhR5bGsrS0Iap01D/r3YTmwn
149-
3UJ9EGLevDczT+NDwBjIFGIGouAkSqvCt893s98/P3xO/Wq/oOVu+fW2+miLh0e7/uJgVDPM9gUF
150-
nVlvhCmtQpJGt7BwyAlr1fF8OpmPp9F43AClSVHVtNxSeGPCUmoZTqLJTRjNw/HixC6MFOghZt8C
151-
xhg7NGfdp07xF8QsGp0jJXrPc4S4S2IMnFF1BLj30hPXBKMeFEYT6rp1XSk1AMgYlQiuVF+4/Q6D
152-
e28WVyqZvHtafnpY31fvX7LH3brQz3ez9f30aVCvld7bpqGs0qIzaYB38fiqGGOgedlwV/tV493o
153-
OoG7vCpRU902HDZgKrIVbSDegDVektzhBo5wQTsGr92/D9xwmFWeq5NNp/ix812Z3Dqz9Vc2Qia1
154-
9EXikPtmHPBkbFu7rtNUgOriycA6U1pKyPxAXQvenp4X+q3qwdkJI0NcDTjn+IVYkiJx2Txot0OC
155-
iwLTntnvEq9SaQZAMBj5715e027Hljr/H/keEAItYZpYh6kUl/P2aQ7rX+5faZ3FTcPg956wTDKp
156-
c3TWyWbhIbNJNI9m22wxFxEEx+APAAAA//8DAKvv9cv+AwAA
152+
H4sIAAAAAAAAAwAAAP//jFNNj5swEL3zK6w5hypfuyQct5W6rZRWOWxaqamQYwZw19iuPWyLovz3
153+
CkiApFupHJA1b958vJk5BoyBTCFmIApOorQqfJg+1O8+7578c/p2u63lU7F6/Krx8P6jVVuYNAxz
154+
+IGCLqw3wpRWIUmjO1g45IRN1Fm0WC9Wd9Fs0QKlSVE1tNxSuDRhKbUM59P5MpxG4Wx1ZhdGCvQQ
155+
s28BY4wd239Tp07xN8RsOrlYSvSe5whx78QYOKMaC3DvpSeuCSYDKIwm1E3pulJqBJAxKhFcqSFx
156+
9x1H70EsrlRSf/Ab8WWdfbK//ONuEf3cabpXkRjl60LXti0oq7ToRRrhvT2+ScYYaF623E29abWb
157+
3Dpwl1clamrKhuMeTEW2oj3Ee7DGS5IvuIcTXNFOwWvv7yM1HGaV5+os09l+6nVXJrfOHPyNjJBJ
158+
LX2ROOS+bQc8GdvlbvK0GaC6GhlYZ0pLCZln1E3A9Xm8MGzVAN6fMTLE1YhzsV8FS1IkLtuB9jsk
159+
uCgwHZjDLvEqlWYEBKOW/67ltdhd21Ln/xN+AIRAS5gm1mEqxXW/g5vD5uT+5dZL3BYMHt2LFJiQ
160+
RNeMIcWMV6o7BPC1JyyTTOocnXWyvQbIbBLNMY344W4pIDgFfwAAAP//AwDiuM2JGwQAAA==
157161
headers:
158-
CF-Cache-Status:
159-
- DYNAMIC
160162
CF-RAY:
161-
- 8e925b6249ff94f4-LIS
163+
- 910eb724fd6a7af8-SJC
162164
Connection:
163165
- keep-alive
164166
Content-Encoding:
165167
- gzip
166168
Content-Type:
167169
- application/json
168170
Date:
169-
- Wed, 27 Nov 2024 13:10:11 GMT
171+
- Wed, 12 Feb 2025 18:41:53 GMT
170172
Server:
171173
- cloudflare
172174
Set-Cookie:
173-
- __cf_bm=Rze_tf1qsWwR_8QujkWAs6xDPCjhF1EhOyAHTJRVIS4-1732713011-1.0.1.1-2LaAGNN.HICqWJ6mxxl2CLvL0leU5OPxcevV_E2cIBFl0F0jS_Xe2hHQvI4hUCTgKum4ONWEsP.xZvTYrthTZA;
174-
path=/; expires=Wed, 27-Nov-24 13:40:11 GMT; domain=.api.openai.com; HttpOnly;
175+
- __cf_bm=I1hzvyraesECfeyQA9.Q6GSU6KRGACNzkOEn6vG1tk4-1739385713-1.0.1.1-.365yJyPYuLycmESsXJb3epBqMu.qoyTGh3Z6Ng.4aTxmx78HHACXUJ4eFmphWvHE94OzMVswY6CdNRix.mz.A;
176+
path=/; expires=Wed, 12-Feb-25 19:11:53 GMT; domain=.api.openai.com; HttpOnly;
175177
Secure; SameSite=None
176-
- _cfuvid=casFCUelDW.rt7msWDCnalImxZa8O8ek7gps6QicC_g-1732713011934-0.0.1.1-604800000;
178+
- _cfuvid=XtEh__t27k4FEMshKxIn_IGe4b_JYyNZvG_Bo95Hcpo-1739385713920-0.0.1.1-604800000;
177179
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
178180
Transfer-Encoding:
179181
- chunked
@@ -183,10 +185,12 @@ interactions:
183185
- X-Request-ID
184186
alt-svc:
185187
- h3=":443"; ma=86400
188+
cf-cache-status:
189+
- DYNAMIC
186190
openai-organization:
187191
- heartex
188192
openai-processing-ms:
189-
- '222'
193+
- '427'
190194
openai-version:
191195
- '2020-10-01'
192196
strict-transport-security:
@@ -204,7 +208,7 @@ interactions:
204208
x-ratelimit-reset-tokens:
205209
- 0s
206210
x-request-id:
207-
- req_50b976601cf6c0af3d7fe6944c839fc9
211+
- req_89d6069a190e1bf7dd6f969e1f8fd964
208212
status:
209213
code: 200
210214
message: OK

tests/test_stream_inference.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@
5656
"task_id": 100,
5757
"input": "I am happy",
5858
"output": "positive",
59-
"_completion_cost_usd": 3e-06,
60-
"_completion_tokens": 5,
59+
"_completion_cost_usd": 3.6e-06,
60+
"_completion_tokens": 6,
6161
"_prompt_cost_usd": 1.35e-05,
6262
"_prompt_tokens": 90,
63-
"_total_cost_usd": 1.65e-05,
63+
"_total_cost_usd": 1.71e-05,
6464
}
6565
]
6666

@@ -123,6 +123,7 @@ async def send_side_effect(*args, **kwargs):
123123

124124
mock_producer.send_and_wait = AsyncMock(side_effect=send_and_wait_side_effect)
125125
mock_producer.send = AsyncMock(side_effect=send_side_effect)
126+
mock_producer._max_request_size = 3000000
126127

127128
yield mock_producer
128129

0 commit comments

Comments
 (0)