Skip to content

Commit

Permalink
Additional checks for kafka readiness (#41)
Browse files Browse the repository at this point in the history
* Explicitly set producer=None when exception occurs on initial connection
* Handle `INVALID_REPLICATION_FACTOR` in main `read_firehose` loop for extra robustness
  • Loading branch information
polastre authored Apr 14, 2022
1 parent 9899a49 commit 4a3afa0
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async def read_firehose(time_mode: str) -> Optional[str]:
"""
# pylint: disable=global-statement
# pylint: disable=too-many-statements
global last_good_pitr, lines_read, bytes_read
global last_good_pitr, lines_read, bytes_read, producer

context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_2
Expand Down Expand Up @@ -262,11 +262,14 @@ def delivery_report(err, _):
print(f"Encountered full outgoing buffer, should resolve itself: {e}")
time.sleep(1)
except KafkaException as e:
if not e.args[0].retriable():
print(f"Kafka exception occurred that cannot be retried: {e}")
err = e.args[0]
# INVALID_REPLICATION_FACTOR occurs when Kafka broker is in transient state
# and the partition count is still 0 so there's no leader. Wait to retry.
if err.code != KafkaError.INVALID_REPLICATION_FACTOR and not err.retriable():
print(f"Kafka exception occurred that cannot be retried: {err}")
raise
print(
f"Encountered retriable kafka error ({e.args[0].str()}), "
f"Encountered retriable kafka error ({err}), "
"waiting a moment and trying again"
)
time.sleep(1)
Expand All @@ -292,6 +295,7 @@ async def main():
value="",
)
except KafkaException as error:
producer = None
print(f"Kafka isn't available ({error}), trying again in a few seconds")
time.sleep(3)

Expand Down

0 comments on commit 4a3afa0

Please sign in to comment.