From 4a3afa00e81a882e28915e0300e8565595ee4f9c Mon Sep 17 00:00:00 2001 From: Joe Polastre Date: Wed, 13 Apr 2022 20:39:49 -0700 Subject: [PATCH] Additional checks for kafka readiness (#41) * Explicitly set producer=None when exception occurs on initial connection * Handle `INVALID_REPLICATION_FACTOR` in main `read_firehose` loop for extra robustness --- connector/main.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/connector/main.py b/connector/main.py index 9f85e2c..18ff915 100644 --- a/connector/main.py +++ b/connector/main.py @@ -189,7 +189,7 @@ async def read_firehose(time_mode: str) -> Optional[str]: """ # pylint: disable=global-statement # pylint: disable=too-many-statements - global last_good_pitr, lines_read, bytes_read + global last_good_pitr, lines_read, bytes_read, producer context = ssl.create_default_context() context.minimum_version = ssl.TLSVersion.TLSv1_2 @@ -262,11 +262,14 @@ def delivery_report(err, _): print(f"Encountered full outgoing buffer, should resolve itself: {e}") time.sleep(1) except KafkaException as e: - if not e.args[0].retriable(): - print(f"Kafka exception occurred that cannot be retried: {e}") + err = e.args[0] + # INVALID_REPLICATION_FACTOR occurs when Kafka broker is in transient state + # and the partition count is still 0 so there's no leader. Wait to retry. + if err.code != KafkaError.INVALID_REPLICATION_FACTOR and not err.retriable(): + print(f"Kafka exception occurred that cannot be retried: {err}") raise print( - f"Encountered retriable kafka error ({e.args[0].str()}), " + f"Encountered retriable kafka error ({err}), " "waiting a moment and trying again" ) time.sleep(1) @@ -292,6 +295,7 @@ async def main(): value="", ) except KafkaException as error: + producer = None print(f"Kafka isn't available ({error}), trying again in a few seconds") time.sleep(3)