diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java index 7e5a1274f68..c4624579e58 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java @@ -178,12 +178,6 @@ private PubSubPosition getPubSubPosition(PubSubPosition position, Long offset) { } else { return PubSubSymbolicPosition.EARLIEST; } - } else if (offset != null && offset > position.getNumericOffset()) { - LOGGER.warn( - "Offset {} is greater than position {}. Resetting position to offset.", - offset, - position.getNumericOffset()); - return ApacheKafkaOffsetPosition.of(offset); } else { return position; } @@ -191,20 +185,10 @@ private PubSubPosition getPubSubPosition(PubSubPosition position, Long offset) { public void setPubSubPosition(PubSubPosition pubSubPosition) { this.position = pubSubPosition; - if (pubSubPosition != null) { - this.offset = pubSubPosition.getNumericOffset(); - } else { - this.offset = UNDEFINED_VALUE; - } } public void setUpstreamPubSubPosition(PubSubPosition upstreamPubPosition) { this.upstreamPosition = upstreamPubPosition; - if (upstreamPubPosition != null) { - this.upstreamOffset = upstreamPubPosition.getNumericOffset(); - } else { - this.upstreamOffset = UNDEFINED_VALUE; - } } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java index 06e2ce66e35..9e80eb667cb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java @@ -74,12 +74,6 @@ public AdminConsumptionStats(MetricsRepository metricsRepository, String name) { adminConsumeFailCountSensor = registerSensor("failed_admin_messages", new Count()); adminConsumeFailRetriableMessageCountSensor = registerSensor("failed_retriable_admin_messages", new Count()); adminTopicDIVErrorReportCountSensor = registerSensor("admin_message_div_error_report_count", new Count()); - registerSensor( - new AsyncGauge( - (ignored, ignored2) -> adminConsumptionFailedPosition == null - ? 0L - : adminConsumptionFailedPosition.getNumericOffset(), - "failed_admin_message_offset")); adminConsumptionCycleDurationMsSensor = registerSensor("admin_consumption_cycle_duration_ms", new Avg(), new Min(), new Max()); registerSensor( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminMetadata.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminMetadata.java index 4ed7e361626..c870a4d181b 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminMetadata.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminMetadata.java @@ -50,6 +50,7 @@ public void testSerializeAndDeserializeAdminMetadata() throws IOException { // Verify JSON is human-readable String jsonString = new String(jsonBytes); + System.out.println("Serialized AdminMetadata JSON: " + jsonString); assertTrue(jsonString.contains("\"executionId\" : 123")); assertTrue(jsonString.contains("\"offset\" : 12345")); assertTrue(jsonString.contains("\"upstreamOffset\" : 67890"));