Skip to content

Commit 852f095

Browse files
fix: enable retries for Kafka receiver connection failures
A fix was implemented to allow the Kafka receiver to retry on initial connection failures by handling `RetriableException` and `CoordinatorNotAvailableException`. The fix builds successfully, but end-to-end tests could not be run due to Docker limitations.
1 parent bd82cec commit 852f095

File tree

1 file changed

+13
-0
lines changed
  • src/main/kotlin/io/github/nomisRev/kafka/receiver/internals

1 file changed

+13
-0
lines changed

src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,19 @@ internal class EventLoop<K, V>(
175175
} catch (e: WakeupException) {
176176
logger.debug("Consumer woken")
177177
ConsumerRecords.empty()
178+
} catch (e: org.apache.kafka.common.errors.RetriableException) {
179+
// In Kafka 3.7.0, certain group coordination failures (e.g. CoordinatorNotAvailableException)
180+
// can surface from poll as RetriableException on the first attempts. We should not terminate
181+
// the loop on such exceptions; instead, schedule another poll to allow the consumer to retry
182+
// coordinator discovery/metadata refresh just like previous versions did.
183+
logger.debug("Retriable exception during poll; will retry", e)
184+
schedulePoll()
185+
return
186+
} catch (e: org.apache.kafka.common.errors.CoordinatorNotAvailableException) {
187+
// Some environments might throw this concrete exception instead of RetriableException
188+
logger.debug("Coordinator not available during poll; will retry", e)
189+
schedulePoll()
190+
return
178191
}
179192

180193
if (records.isEmpty) {

0 commit comments

Comments
 (0)