Skip to content

fix: Apply timeout on batch basis #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

andrewhessler
Copy link

@andrewhessler andrewhessler commented Jun 9, 2025

What

Applies timeout to the entire batch instead of on a per message basis.

How

The calls to the individual consume share the batch timeout and each call to consume can only use the timeout that the batch has remaining.

Why

The existing behavior is described in detail in this issue.

In short, for d millisecond delay, c count, and b blocking time: b = d * c, given a constant topic rpm 60000 / d.

Given any rpm > 60000 / d: b = c * (60 / rpm) seconds

For example, a 1000ms delay with a batch count of 100 will cause the consumeNum loop to block for up to 100 seconds given a constant topic rpm of 60.

References

Issue: confluentinc/confluent-kafka-javascript#262
PR Introduced: #34

Test & Review

With c = 100, d = 1000, rpm = 700, expectation is that pre-change we'd block for about 8.6 seconds (it seems like it takes longer in the example, though) before returning the 100 messages.

My testing environment has a 50ms cooldown on calling consume, so after the change we expect a batch of messages to be returned every 1050ms.

Pre-Change
confluent-node-rdkafka-pre2

Post-Change
confluent-node-rdkafka-post2

@andrewhessler andrewhessler changed the title apply timeout on batch basis fix: Apply timeout on batch basis Jun 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant