-
Notifications
You must be signed in to change notification settings - Fork 20
logReader: manually commit offset when using kafka logConsumer #2657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.0
Are you sure you want to change the base?
Conversation
Hello kerkesni,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files
... and 3 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.0 #2657 +/- ##
===================================================
+ Coverage 73.35% 73.65% +0.30%
===================================================
Files 201 201
Lines 13390 13404 +14
===================================================
+ Hits 9822 9873 +51
+ Misses 3558 3521 -37
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Request integration branchesWaiting for integration branch creation to be requested by the user. To request integration branches, please comment on this pull request with the following command:
Alternatively, the |
Kafka log consumer currently auto-commits messages after they are consumed. This is an issue as it means we loose messages after restart of crash even if their processing didn't finish. Like we do with other logConsumer implementations, we now manually store offsets after the end of each batch. The auto-commit mechanism commits offsets stored locally each interval. Contrary to the BackbeatConsumer, there is no risk of offsets being committed in disorder as only one batch is processed at a time, and a partition is only assigned to a single instance of the QueuePopulator. Issue: BB-698
Request integration branchesWaiting for integration branch creation to be requested by the user. To request integration branches, please comment on this pull request with the following command:
Alternatively, the |
} | ||
|
||
/** | ||
* Get partition offsets | ||
* @returns {string} stored partition offsets | ||
* Offsets are stored in kafka are not managed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
* Offsets are stored in kafka are not managed | |
* Offsets are stored in kafka and not managed |
// after the batch processing is fully completed. | ||
'enable.auto.offset.store': false, | ||
// Default auto-commit interval is 5 seconds | ||
'enable.auto.commit': true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with auto-commit
, we will still keep a window (5 seconds) of uncertainty:
- is it the same with backbeatconsumer? I don't see the flag there...
- in backbeatConsume, I think we have code to ensure we "flush" the commited offset when needed : should we do the same here?
@@ -42,13 +44,17 @@ class LogConsumer { | |||
setup(done) { | |||
// partition offsets will be managed by kafka | |||
const consumerParams = { | |||
'enable.auto.offset.store': true, | |||
// Manually manage storing offsets to ensure they are only stored | |||
// after the batch processing is fully completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bonus issue: this will raise the issues we had (and fixed) in BackbeatConsumer around handling of rebalance, i.e. the need to delay the rebalance until the batch has been completed and offsets commited accordingly, and the need to detect/handle "slow tasks" which could block all further processing... It may be done in another PR (or maybe we could reuse BackbeatConsumer for KafkaLogConsumer?), but we probably can't go to prod without that kind of things...
], err =>{ | ||
if (err) { | ||
return cb(err); | ||
} | ||
], (err, res) => cb(err, res[3])); | ||
// ending and returning the stream | ||
this._listRecordStream.end(); | ||
return cb(null, { log: this._listRecordStream, tailable: false }); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think the original flow was better, as it managed the callback (cb
) in a single place, with no added condition (if
) to handle error... or was there an issue? (we can also do res?.[3]
if res could be null)
} | ||
|
||
return this._writeLogOffset(logger, done); | ||
} | ||
|
||
// Handle offset managed externally (e.g., Kafka) | ||
if (!this.isOffsetManaged() && this.logConsumer.storeOffsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When !this.isOffsetManaged()
, then storeOffsets
should always be defined, right?
If this is the case, then this option is redundant (an assertion in constructor is enough)
This would allow a much nicer flow:
if (!this.isOffsetManaged()) {
this.logConsumer.storeOffsets();
} else if (this._shouldUpdateLogOffset(logRes, nextLogOffset)) {
...
}
Kafka log consumer currently auto-commits messages after they are consumed. This is an issue as it means we loose messages after restart or crash even if their processing didn't finish.
Like we do with other logConsumer implementations, we now manually store offsets after the end of each batch. The auto-commit mechanism commits offsets stored locally after each 5 seconds.
Contrary to the BackbeatConsumer, there is no risk of offsets being committed in disorder as only one batch is processed at a time, and a partition is only assigned to a single instance of the QueuePopulator.
Issue: BB-698