-
Notifications
You must be signed in to change notification settings - Fork 108
[server] Switch back to PubSubPosition based reads with offset as fallback #2242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,7 +1,6 @@ | ||||||||||||
| package com.linkedin.davinci.validation; | ||||||||||||
|
|
||||||||||||
| import static com.linkedin.davinci.validation.DataIntegrityValidator.DISABLED; | ||||||||||||
| import static com.linkedin.venice.pubsub.PubSubUtil.getPubSubPositionString; | ||||||||||||
|
|
||||||||||||
| import com.linkedin.venice.annotation.Threadsafe; | ||||||||||||
| import com.linkedin.venice.annotation.VisibleForTesting; | ||||||||||||
|
|
@@ -901,9 +900,8 @@ private String generateMessage( | |||||||||||
| if (consumerRecord.getValue().leaderMetadataFooter != null) { | ||||||||||||
| sb.append("; LeaderMetadata { upstream position: ") | ||||||||||||
| .append( | ||||||||||||
| getPubSubPositionString( | ||||||||||||
| pubSubPositionDeserializer, | ||||||||||||
| consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) | ||||||||||||
| pubSubPositionDeserializer | ||||||||||||
| .toPosition(consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) | ||||||||||||
|
Comment on lines
+903
to
+904
|
||||||||||||
| pubSubPositionDeserializer | |
| .toPosition(consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) | |
| consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition == null | |
| ? "-" | |
| : pubSubPositionDeserializer.toPosition(consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,7 +1,6 @@ | ||||||
| package com.linkedin.venice; | ||||||
|
|
||||||
| import static com.linkedin.venice.chunking.ChunkKeyValueTransformer.KeyType.WITH_VALUE_CHUNK; | ||||||
| import static com.linkedin.venice.pubsub.PubSubUtil.getPubSubPositionString; | ||||||
|
|
||||||
| import com.github.luben.zstd.Zstd; | ||||||
| import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; | ||||||
|
|
@@ -506,9 +505,7 @@ private void logRecordMetadata(DefaultPubSubMessage record) { | |||||
| producerMetadata.messageTimestamp, | ||||||
| producerMetadata.logicalTimestamp, | ||||||
| leaderMetadata == null ? "-" : leaderMetadata.hostName, | ||||||
| leaderMetadata == null | ||||||
| ? "-" | ||||||
| : getPubSubPositionString(pubSubPositionDeserializer, leaderMetadata.upstreamPubSubPosition), | ||||||
| leaderMetadata == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), | ||||||
|
||||||
| leaderMetadata == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), | |
| (leaderMetadata == null || leaderMetadata.upstreamPubSubPosition == null) ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException when leaderMetadata.upstreamPubSubPosition is null. The toPosition method throws an IllegalArgumentException when passed a null ByteBuffer. Add a null check for upstreamPubSubPosition or use a nested ternary to handle this case: leaderMetadata == null || leaderMetadata.upstreamPubSubPosition == null ? \"-\" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition)
| leaderMetadata == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), | |
| leaderMetadata == null || leaderMetadata.upstreamPubSubPosition == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,6 @@ | |
| import com.linkedin.venice.pubsub.PubSubContext; | ||
| import com.linkedin.venice.pubsub.PubSubPositionDeserializer; | ||
| import com.linkedin.venice.pubsub.PubSubTopicRepository; | ||
| import com.linkedin.venice.pubsub.PubSubUtil; | ||
| import com.linkedin.venice.pubsub.api.PubSubPosition; | ||
| import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; | ||
| import com.linkedin.venice.pubsub.api.PubSubTopic; | ||
|
|
@@ -128,7 +127,9 @@ public String getPreviousStatusesEntry(CharSequence key) { | |
| } | ||
|
|
||
| public PubSubPosition getCheckpointedLocalVtPosition() { | ||
| return PubSubUtil.fromKafkaOffset(this.partitionState.offset); | ||
| return deserializePositionWithOffsetFallback( | ||
| this.partitionState.lastProcessedVersionTopicPubSubPosition, | ||
| this.partitionState.offset); | ||
| } | ||
|
|
||
| public void checkpointLocalVtPosition(PubSubPosition vtPosition) { | ||
|
|
@@ -141,7 +142,9 @@ public void checkpointLocalVtPosition(PubSubPosition vtPosition) { | |
| } | ||
|
|
||
| public PubSubPosition getCheckpointedRemoteVtPosition() { | ||
| return PubSubUtil.fromKafkaOffset(this.partitionState.upstreamVersionTopicOffset); | ||
| return deserializePositionWithOffsetFallback( | ||
| this.partitionState.upstreamVersionTopicPubSubPosition, | ||
| this.partitionState.upstreamVersionTopicOffset); | ||
| } | ||
|
|
||
| public void checkpointRemoteVtPosition(PubSubPosition remoteVtPosition) { | ||
|
|
@@ -300,11 +303,12 @@ public PubSubTopic getLeaderTopic(PubSubTopicRepository pubSubTopicRepository) { | |
| */ | ||
| public PubSubPosition getCheckpointedRtPosition(String pubSubBrokerAddress) { | ||
| Long offset = partitionState.upstreamOffsetMap.get(pubSubBrokerAddress); | ||
| ByteBuffer wfBuffer = partitionState.upstreamRealTimeTopicPubSubPositionMap.get(pubSubBrokerAddress); | ||
| if (offset == null) { | ||
| // If the offset is not set, return EARLIEST symbolic position. | ||
| return PubSubSymbolicPosition.EARLIEST; | ||
| } | ||
| return PubSubUtil.fromKafkaOffset(offset); | ||
| return deserializePositionWithOffsetFallback(wfBuffer, offset); | ||
| } | ||
|
|
||
| public void checkpointRtPosition(String pubSubBrokerAddress, PubSubPosition leaderPosition) { | ||
|
|
@@ -333,8 +337,9 @@ public void cloneRtPositionCheckpoints(@Nonnull Map<String, PubSubPosition> chec | |
| checkpointUpstreamPositionsReceiver.clear(); | ||
| for (Map.Entry<String, Long> offsetEntry: partitionState.upstreamOffsetMap.entrySet()) { | ||
| String pubSubBrokerAddress = offsetEntry.getKey(); | ||
| ByteBuffer wfBuffer = partitionState.upstreamRealTimeTopicPubSubPositionMap.get(pubSubBrokerAddress); | ||
| checkpointUpstreamPositionsReceiver | ||
| .put(pubSubBrokerAddress, PubSubUtil.fromKafkaOffset(offsetEntry.getValue())); | ||
| .put(pubSubBrokerAddress, deserializePositionWithOffsetFallback(wfBuffer, offsetEntry.getValue())); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -523,7 +528,7 @@ PubSubPosition deserializePositionWithOffsetFallback(ByteBuffer wireFormatBytes, | |
| final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes); | ||
|
|
||
| // Guard against regressions: honor the caller-provided minimum offset. | ||
| if (offset > 0 && position.getNumericOffset() < offset) { | ||
| if (position.getNumericOffset() < offset) { | ||
|
||
| LOGGER.info( | ||
| "Deserialized position: {} is behind the provided offset: {}. Using offset-based position.", | ||
| position.getNumericOffset(), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the
offset > 0check means that this condition will now fire even whenoffsetis 0 or negative. This could be intentional for handling edge cases, but it changes the behavior for positions at offset 0. Consider documenting why the guard was removed or verifying that positions at offset 0 should be subject to this fallback logic.