-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33173] Support currentFetchEventTimeLag metrics for KafkaSource #54
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tan-JiaLiang Thanks for the contribution! I left a comment.
consumerRecords | ||
.iterator() | ||
.forEachRemaining( | ||
record -> kafkaSourceReaderMetrics.recordFetched(record.timestamp())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it is not accurate to use ConsumerRecord#timestamp
directly as the event time. Users can specify their own WatermarkStrategy
to extract event time from the payload of the record instead of using the timestamp on ConsumerRecord
: https://github.com/apache/flink/blob/6b52a4107db7521a25f4f308891095c5ba33cca0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L2025
Fetch time is determined in split reader in fetcher manager thread, while event time is determined in SourceOperator in task's main thread, so there's a gap between, and that's why we didn't implement this metric in the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PatrickRen It is my pleasure that you comming to review my code.
But i am still confusing what is the difference between EventTime
in currentFetchEventTimeLag
and currentEmitEventTimeLag
?
And why ConsumerRecord#timestamp
can directly as the event time in https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L52
Can you please explain it? Looking forward to your feedback!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tan-JiaLiang The event time is the same in these two metrics.
The event time of a record is determined by two ways:
- Use the event time emitted by
SourceOutput#collect(record, timestamp)
, which is the code snippet you mentioned inKafkaRecordEmitter
- Use the user specified
TimestampAssigner
, which will override the event time in 1: https://github.com/apache/flink/blob/935188f06a8a94b3a2c991a8aa6e48b2bfaeee70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java#L107
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PatrickRen Thanks for your feedback!
Got it. Seems it is imposible to support currentFetchEventTimeLag
now in kafka connector until we can get the event time in kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tan-JiaLiang Indeed. This is a shared issue across all source connectors, because SourceReaderBase doesn't provide an interface for implementations to deliver FetchTime
all the way to the place extracting EventTime
in SourceOutput.
What is the purpose of the change
In the connector specifications of FLIP-33,
currentFetchEventTimeLag
metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. It can reflect the consupition latency before deserialization.Brief change log
currentFetchEventTimeLag
Verifying this change