diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 6c0f3383991..56aa0ef889e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -1097,13 +1097,19 @@ protected void syncConsumedUpstreamRTOffsetMapIfNeeded( Map upstreamStartPositionByPubSubUrl) { // Update in-memory consumedUpstreamRTOffsetMap in case no RT record is consumed after the subscription final PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(pubSubTopicRepository); + final PubSubTopicPartition leaderTopicPartition = pcs.getSourceTopicPartition(leaderTopic); if (leaderTopic != null && leaderTopic.isRealTime()) { upstreamStartPositionByPubSubUrl.forEach((kafkaURL, upstreamStartPosition) -> { PubSubPosition latestConsumedRtPosition = getLatestConsumedUpstreamPositionForHybridOffsetLagMeasurement(pcs, kafkaURL); // update latest consumed RT position if incoming upstream start position is greater // than the latest consumed RT position - if (upstreamStartPosition.getNumericOffset() > latestConsumedRtPosition.getNumericOffset()) { + if ((PubSubSymbolicPosition.EARLIEST.equals(latestConsumedRtPosition) + && !PubSubSymbolicPosition.EARLIEST.equals(upstreamStartPosition)) + || getTopicManager(kafkaURL).diffPosition( + Utils.createPubSubTopicPartitionFromLeaderTopicPartition(kafkaURL, leaderTopicPartition), + upstreamStartPosition, + latestConsumedRtPosition) > 0) { updateLatestConsumedRtPositions(pcs, kafkaURL, upstreamStartPosition); } }); @@ -1583,7 +1589,8 @@ protected static void checkAndHandleUpstreamOffsetRewind( final PubSubPosition newUpstreamPosition, final PubSubPosition previousUpstreamPosition, LeaderFollowerStoreIngestionTask ingestionTask) { - if (newUpstreamPosition.getNumericOffset() >= previousUpstreamPosition.getNumericOffset()) { + if (PubSubSymbolicPosition.EARLIEST.equals(previousUpstreamPosition) + || topicManager.diffPosition(pubSubTopicPartition, newUpstreamPosition, previousUpstreamPosition) >= 0) { return; // Rewind did not happen } if (!ingestionTask.isHybridMode()) { @@ -1988,7 +1995,8 @@ protected boolean shouldProcessRecord(DefaultPubSubMessage record) { } PubSubPosition lastProcessedVtPos = partitionConsumptionState.getLatestProcessedVtPosition(); - if (lastProcessedVtPos.getNumericOffset() >= record.getPosition().getNumericOffset()) { + if (!PubSubSymbolicPosition.EARLIEST.equals(lastProcessedVtPos) && topicManagerRepository.getLocalTopicManager() + .diffPosition(record.getTopicPartition(), lastProcessedVtPos, record.getPosition()) >= 0) { String message = partitionConsumptionState.getLeaderFollowerState() + " replica: " + partitionConsumptionState.getReplicaId() + " had already processed the record"; if (!REDUNDANT_LOGGING_FILTER.isRedundantException(message)) { @@ -3730,7 +3738,7 @@ void loadGlobalRtDiv(int partition, String brokerUrl) { // If the GlobalRtDivState is not present, it could be acceptable if this could be the first leader to be elected // Object not existing could be problematic if this isn't the first leader (detected via nonzero leaderPosition) PubSubPosition leaderPosition = pcs.getLeaderPosition(brokerUrl, false); - if (leaderPosition.getNumericOffset() > 0) { + if (!PubSubSymbolicPosition.EARLIEST.equals(leaderPosition)) { LOGGER.warn( "Unable to retrieve Global RT DIV from storage engine for topic-partition: {} brokerUrl: {} leaderPosition: {}", topicPartition, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index c7f25878d96..6d55ef4352f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -4727,10 +4727,11 @@ protected PubSubPosition extractUpstreamPosition(DefaultPubSubMessage consumerRe return PubSubSymbolicPosition.EARLIEST; } LeaderMetadata leaderMetadataFooter = consumerRecord.getValue().leaderMetadataFooter; - - // always return upstreamOffset instead of upstreamPubSubPosition - // till we fix all the issues in offset to pubsubPosition migration - return PubSubUtil.fromKafkaOffset(leaderMetadataFooter.upstreamOffset); + return deserializePositionWithOffsetFallback( + pubSubContext.getPubSubPositionDeserializer(), + consumerRecord.getTopicPartition(), + leaderMetadataFooter.upstreamPubSubPosition, + leaderMetadataFooter.upstreamOffset); } // extract the upstream cluster id from the given consumer record's leader metadata. @@ -5048,7 +5049,7 @@ PubSubPosition deserializePositionWithOffsetFallback( try { final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes); // Guard against regressions: honor the caller-provided minimum offset. - if (offset > 0 && position.getNumericOffset() < offset) { + if (position.getNumericOffset() < offset) { String context = String.format(" for: %s/%s", topicPartition, versionTopic); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(context)) { LOGGER.warn( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java index 1ac535a082b..803029eda93 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java @@ -1,7 +1,6 @@ package com.linkedin.davinci.validation; import static com.linkedin.davinci.validation.DataIntegrityValidator.DISABLED; -import static com.linkedin.venice.pubsub.PubSubUtil.getPubSubPositionString; import com.linkedin.venice.annotation.Threadsafe; import com.linkedin.venice.annotation.VisibleForTesting; @@ -901,9 +900,8 @@ private String generateMessage( if (consumerRecord.getValue().leaderMetadataFooter != null) { sb.append("; LeaderMetadata { upstream position: ") .append( - getPubSubPositionString( - pubSubPositionDeserializer, - consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) + pubSubPositionDeserializer + .toPosition(consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition)) .append("; upstream pub sub cluster ID: ") .append(consumerRecord.getValue().leaderMetadataFooter.upstreamKafkaClusterId) .append("; producer host name: ") diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 8406600d12a..e5924985602 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -2343,7 +2343,7 @@ public void testSubscribeCompletedPartition(AAConfig aaConfig) throws Exception ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(PubSubPosition.class); verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)) .completed(eq(topic), eq(PARTITION_FOO), positionCaptor.capture(), eq("STANDBY")); - assertEquals(positionCaptor.getValue().getNumericOffset(), p100.getNumericOffset()); + assertEquals(positionCaptor.getValue(), p100); }, aaConfig); config.setBeforeStartingConsumption( () -> doReturn(getOffsetRecord(p100, true, pubSubContext)).when(mockStorageMetadataService) @@ -2364,7 +2364,7 @@ public void testSubscribeCompletedPartitionUnsubscribe(AAConfig aaConfig) throws ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(PubSubPosition.class); verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)) .completed(eq(topic), eq(PARTITION_FOO), positionCaptor.capture(), eq("STANDBY")); - assertEquals(positionCaptor.getValue().getNumericOffset(), p100.getNumericOffset()); + assertEquals(positionCaptor.getValue(), p100); verify(aggKafkaConsumerService, timeout(TEST_TIMEOUT_MS)) .batchUnsubscribeConsumerFor(pubSubTopic, Collections.singleton(fooTopicPartition)); verify(aggKafkaConsumerService, never()).unsubscribeConsumerFor(pubSubTopic, barTopicPartition); @@ -2397,7 +2397,7 @@ public void testCompleteCalledWhenUnsubscribeAfterBatchPushDisabled(AAConfig aaC ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(PubSubPosition.class); verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)) .completed(eq(topic), eq(PARTITION_FOO), positionCaptor.capture(), eq("STANDBY")); - assertEquals(positionCaptor.getValue().getNumericOffset(), p10.getNumericOffset()); + assertEquals(positionCaptor.getValue(), p10); }, aaConfig); config.setBeforeStartingConsumption(() -> { Store mockStore = mock(Store.class); @@ -2593,11 +2593,12 @@ public void testDataValidationCheckPointing(SortedInput sortedInput, AAConfig aa long offset = entry.getValue().getInternalOffset(); LOGGER.info("Verifying completed was called for partition {} and offset {} or greater.", partition, offset); - ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(PubSubPosition.class); + ArgumentCaptor positionCaptor = + ArgumentCaptor.forClass(InMemoryPubSubPosition.class); verify(mockLogNotifier, timeout(LONG_TEST_TIMEOUT).atLeastOnce()) .completed(eq(topic), eq(partition), positionCaptor.capture(), eq("STANDBY")); - PubSubPosition completedPosition = positionCaptor.getValue(); - assertTrue(completedPosition.getNumericOffset() >= offset); + InMemoryPubSubPosition completedPosition = positionCaptor.getValue(); + assertTrue(completedPosition.getInternalOffset() >= offset); }); // After this, all asynchronous processing should be finished, so there's no need for time outs anymore. @@ -4704,14 +4705,14 @@ public void testOffsetSyncBeforeGracefulShutDown(AAConfig aaConfig) throws Excep } else { // If the pcs is non-null, then we perform additional checks to ensure that it was not synced Assert.assertEquals( - pcs.getLatestProcessedVtPosition().getNumericOffset(), - p0.getNumericOffset(), + pcs.getLatestProcessedVtPosition(), + p0, "pcs.getLatestProcessedLocalVersionTopicOffset() for PARTITION_FOO is expected to be zero!"); OffsetRecord offsetRecord = pcs.getOffsetRecord(); assertNotNull(offsetRecord); Assert.assertEquals( - offsetRecord.getCheckpointedLocalVtPosition().getNumericOffset(), - p0.getNumericOffset(), + offsetRecord.getCheckpointedLocalVtPosition(), + p0, "offsetRecord.getCheckpointedLocalVtPosition() for PARTITION_FOO is expected to be zero!"); } @@ -4723,14 +4724,14 @@ public void testOffsetSyncBeforeGracefulShutDown(AAConfig aaConfig) throws Excep assertNotNull(offsetRecord); Assert.assertEquals(pcs.getLatestProcessedVtPosition(), p2); // PCS updated // offsetRecord hasn't been updated yet - Assert.assertEquals(offsetRecord.getCheckpointedLocalVtPosition().getNumericOffset(), p0.getNumericOffset()); + Assert.assertEquals(offsetRecord.getCheckpointedLocalVtPosition(), p0); storeIngestionTaskUnderTest.close(); // Verify the OffsetRecord is synced up with pcs and get persisted only once during shutdown verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS).times(1)).put(eq(topic), eq(PARTITION_FOO), any()); Assert.assertEquals( - offsetRecord.getCheckpointedLocalVtPosition().getNumericOffset(), - p2.getNumericOffset(), + offsetRecord.getCheckpointedLocalVtPosition(), + p2, "offsetRecord.getCheckpointedLocalVtPosition() for PARTITION_FOO is expected to be 2!"); // Verify that the underlying storage engine sync function is invoked. diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/KafkaTopicDumper.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/KafkaTopicDumper.java index ceaa16724be..d6a69c88ac4 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/KafkaTopicDumper.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/KafkaTopicDumper.java @@ -1,7 +1,6 @@ package com.linkedin.venice; import static com.linkedin.venice.chunking.ChunkKeyValueTransformer.KeyType.WITH_VALUE_CHUNK; -import static com.linkedin.venice.pubsub.PubSubUtil.getPubSubPositionString; import com.github.luben.zstd.Zstd; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; @@ -506,9 +505,7 @@ private void logRecordMetadata(DefaultPubSubMessage record) { producerMetadata.messageTimestamp, producerMetadata.logicalTimestamp, leaderMetadata == null ? "-" : leaderMetadata.hostName, - leaderMetadata == null - ? "-" - : getPubSubPositionString(pubSubPositionDeserializer, leaderMetadata.upstreamPubSubPosition), + leaderMetadata == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), leaderMetadata == null ? "-" : leaderMetadata.upstreamKafkaClusterId, chunkMetadata); } catch (Exception e) { @@ -659,9 +656,7 @@ static String constructTopicSwitchLog( producerMetadata.messageTimestamp, producerMetadata.logicalTimestamp, leaderMetadata == null ? "-" : leaderMetadata.hostName, - leaderMetadata == null - ? "-" - : getPubSubPositionString(pubSubPositionDeserializer, leaderMetadata.upstreamPubSubPosition), + leaderMetadata == null ? "-" : pubSubPositionDeserializer.toPosition(leaderMetadata.upstreamPubSubPosition), leaderMetadata == null ? "-" : leaderMetadata.upstreamKafkaClusterId); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java index 384a42adeae..a056f74ebfb 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java @@ -11,7 +11,6 @@ import com.linkedin.venice.pubsub.PubSubContext; import com.linkedin.venice.pubsub.PubSubPositionDeserializer; import com.linkedin.venice.pubsub.PubSubTopicRepository; -import com.linkedin.venice.pubsub.PubSubUtil; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -128,7 +127,9 @@ public String getPreviousStatusesEntry(CharSequence key) { } public PubSubPosition getCheckpointedLocalVtPosition() { - return PubSubUtil.fromKafkaOffset(this.partitionState.offset); + return deserializePositionWithOffsetFallback( + this.partitionState.lastProcessedVersionTopicPubSubPosition, + this.partitionState.offset); } public void checkpointLocalVtPosition(PubSubPosition vtPosition) { @@ -141,7 +142,9 @@ public void checkpointLocalVtPosition(PubSubPosition vtPosition) { } public PubSubPosition getCheckpointedRemoteVtPosition() { - return PubSubUtil.fromKafkaOffset(this.partitionState.upstreamVersionTopicOffset); + return deserializePositionWithOffsetFallback( + this.partitionState.upstreamVersionTopicPubSubPosition, + this.partitionState.upstreamVersionTopicOffset); } public void checkpointRemoteVtPosition(PubSubPosition remoteVtPosition) { @@ -300,11 +303,12 @@ public PubSubTopic getLeaderTopic(PubSubTopicRepository pubSubTopicRepository) { */ public PubSubPosition getCheckpointedRtPosition(String pubSubBrokerAddress) { Long offset = partitionState.upstreamOffsetMap.get(pubSubBrokerAddress); + ByteBuffer wfBuffer = partitionState.upstreamRealTimeTopicPubSubPositionMap.get(pubSubBrokerAddress); if (offset == null) { // If the offset is not set, return EARLIEST symbolic position. return PubSubSymbolicPosition.EARLIEST; } - return PubSubUtil.fromKafkaOffset(offset); + return deserializePositionWithOffsetFallback(wfBuffer, offset); } public void checkpointRtPosition(String pubSubBrokerAddress, PubSubPosition leaderPosition) { @@ -333,8 +337,9 @@ public void cloneRtPositionCheckpoints(@Nonnull Map chec checkpointUpstreamPositionsReceiver.clear(); for (Map.Entry offsetEntry: partitionState.upstreamOffsetMap.entrySet()) { String pubSubBrokerAddress = offsetEntry.getKey(); + ByteBuffer wfBuffer = partitionState.upstreamRealTimeTopicPubSubPositionMap.get(pubSubBrokerAddress); checkpointUpstreamPositionsReceiver - .put(pubSubBrokerAddress, PubSubUtil.fromKafkaOffset(offsetEntry.getValue())); + .put(pubSubBrokerAddress, deserializePositionWithOffsetFallback(wfBuffer, offsetEntry.getValue())); } } } @@ -523,7 +528,7 @@ PubSubPosition deserializePositionWithOffsetFallback(ByteBuffer wireFormatBytes, final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes); // Guard against regressions: honor the caller-provided minimum offset. - if (offset > 0 && position.getNumericOffset() < offset) { + if (position.getNumericOffset() < offset) { LOGGER.info( "Deserialized position: {} is behind the provided offset: {}. Using offset-based position.", position.getNumericOffset(), diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java index f40e4612e35..05cd4f1f899 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java @@ -307,12 +307,4 @@ public static PubSubPosition parsePositionWireFormat( throw new IllegalArgumentException("Invalid base64 encoded bytes in position wire format string", e); } } - - public static String getPubSubPositionString( - PubSubPositionDeserializer pubSubPositionDeserializer, - ByteBuffer pubSubPosition) { - return (pubSubPosition == null || !pubSubPosition.hasRemaining()) - ? "" - : pubSubPositionDeserializer.toPosition(pubSubPosition).toString(); - } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java index 8a94dfafc4a..258d752a841 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java @@ -563,61 +563,4 @@ public void testParsePositionWireFormatInvalidInputs() { () -> PubSubUtil.parsePositionWireFormat("1:invalid_base64!", deserializer)); expectThrows(IllegalArgumentException.class, () -> PubSubUtil.parsePositionWireFormat("1:", deserializer)); } - - @Test - public void testGetPubSubPositionString() { - PubSubPositionDeserializer deserializer = PubSubPositionDeserializer.DEFAULT_DESERIALIZER; - - // Case 1: Null ByteBuffer should return "" - String result = PubSubUtil.getPubSubPositionString(deserializer, null); - assertEquals(result, "", "Null ByteBuffer should return "); - - // Case 2: Empty ByteBuffer (no remaining bytes) should return "" - ByteBuffer emptyBuffer = ByteBuffer.allocate(0); - result = PubSubUtil.getPubSubPositionString(deserializer, emptyBuffer); - assertEquals(result, "", "Empty ByteBuffer should return "); - - // Case 3: ByteBuffer with position fully consumed (no remaining) should return "" - ApacheKafkaOffsetPosition position = ApacheKafkaOffsetPosition.of(100L); - ByteBuffer consumedBuffer = position.toWireFormatBuffer(); - // Consume all bytes by moving position to limit - consumedBuffer.position(consumedBuffer.limit()); - result = PubSubUtil.getPubSubPositionString(deserializer, consumedBuffer); - assertEquals(result, "", "ByteBuffer with no remaining bytes should return "); - - // Case 4: Valid ByteBuffer with ApacheKafkaOffsetPosition data should deserialize and return toString() - ApacheKafkaOffsetPosition validPosition = ApacheKafkaOffsetPosition.of(12345L); - ByteBuffer validBuffer = validPosition.toWireFormatBuffer(); - result = PubSubUtil.getPubSubPositionString(deserializer, validBuffer); - assertEquals( - result, - validPosition.toString(), - "Valid ByteBuffer should deserialize and return position's toString()"); - - // Case 5: Symbolic positions - EARLIEST - ByteBuffer earliestBuffer = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); - result = PubSubUtil.getPubSubPositionString(deserializer, earliestBuffer); - assertEquals( - result, - PubSubSymbolicPosition.EARLIEST.toString(), - "Should correctly deserialize EARLIEST symbolic position"); - - // Case 6: Symbolic positions - LATEST - ByteBuffer latestBuffer = PubSubSymbolicPosition.LATEST.toWireFormatBuffer(); - result = PubSubUtil.getPubSubPositionString(deserializer, latestBuffer); - assertEquals( - result, - PubSubSymbolicPosition.LATEST.toString(), - "Should correctly deserialize LATEST symbolic position"); - - // Case 7: ByteBuffer with partial position data followed by limit - ApacheKafkaOffsetPosition partialPosition = ApacheKafkaOffsetPosition.of(777L); - byte[] wireFormatBytes = partialPosition.toWireFormatBytes(); - // Create a buffer with extra capacity but only set limit to actual data - ByteBuffer bufferWithLimit = ByteBuffer.allocate(wireFormatBytes.length + 10); - bufferWithLimit.put(wireFormatBytes); - bufferWithLimit.flip(); // Set limit to current position and reset position to 0 - result = PubSubUtil.getPubSubPositionString(deserializer, bufferWithLimit); - assertEquals(result, partialPosition.toString(), "Should correctly handle ByteBuffer with limit set"); - } }