[server][da-vinci] Add PubSub health monitor with partition-level pause/resume#2534
[server][da-vinci] Add PubSub health monitor with partition-level pause/resume#2534misyel wants to merge 11 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a PubSub Health Monitoring system to gracefully handle transient broker outages by pausing affected partitions instead of killing ingestion tasks. When brokers recover, partitions are automatically resumed from their checkpointed positions. The feature is disabled by default (both server.pubsub.health.monitor.enabled and server.pubsub.partition.pause.enabled default to false).
Changes:
- Adds core health monitoring infrastructure with pluggable signal providers and listener notification
- Routes PubSub exceptions to partition-level instead of task-level to enable granular pause/resume
- Implements background probe thread that tests unhealthy brokers and triggers automatic recovery
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| VeniceServer.java | Initializes PubSubHealthMonitor and wires it to KafkaStoreIngestionService |
| PubSubHealthStatus.java | Enum defining HEALTHY/UNHEALTHY states |
| PubSubHealthSignalProvider.java | Interface for pluggable health signal detection strategies |
| PubSubHealthChangeListener.java | Callback interface for health state transitions |
| PubSubHealthCategory.java | Enum for independent health tracking (BROKER, METADATA_SERVICE) |
| ConfigKeys.java | New config keys for monitor enable, partition pause, and probe interval |
| PubSubHealthCategoryTest.java | Test for metrics dimension interface implementation |
| VeniceMetricsDimensions.java | Adds VENICE_PUBSUB_HEALTH_CATEGORY dimension |
| StorePartitionDataReceiverTest.java | Tests PubSub exception routing to partition-level |
| PubSubHealthMonitorTest.java | Comprehensive unit tests for health monitor lifecycle |
| ExceptionBasedHealthSignalProviderTest.java | Tests exception-based health signal provider |
| PubSubHealthOtelMetricEntity.java | OTel metric definitions (unhealthy count, probe success/failure, transitions) |
| PubSubHealthMonitorStats.java | OTel stats wrapper for health monitor metrics |
| StorePartitionDataReceiver.java | Routes PubSub exceptions to partition-level instead of task-level |
| StoreIngestionTask.java | Adds partition pause on PubSub exceptions and resume logic |
| PubSubHealthMonitor.java | Core health monitor with probe thread and listener notification |
| LeaderFollowerStoreIngestionTask.java | Override seekToCheckpointAndResume to delegate to existing resubscribe logic |
| KafkaStoreIngestionService.java | Implements PubSubHealthChangeListener to resume partitions on recovery |
| ExceptionBasedHealthSignalProvider.java | First signal provider implementation using exception-based detection |
| VeniceServerConfig.java | Config accessors for new health monitor settings |
Comments suppressed due to low confidence (1)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java:5579
- The paused partitions set is cleared before attempting to resume any partitions. If resuming a partition fails with an exception other than InterruptedException (e.g., if resubscribeAsLeader/resubscribeAsFollower throws), those partitions are lost from the paused set and won't be automatically retried on subsequent health status changes.
Consider clearing individual partitions from the set only after they successfully resume, or re-adding failed partitions back to the set so they can be retried on the next recovery notification.
// Snapshot and clear before resubscribing
Set<Integer> partitionsToResume = new HashSet<>(pubSubHealthPausedPartitions);
pubSubHealthPausedPartitions.clear();
LOGGER
.info("Resuming {} partitions paused for PubSub health on broker {}", partitionsToResume.size(), pubSubAddress);
// For each partition, unsubscribe from its consuming topic(s) at the consumer level,
// drain the store buffer, and re-subscribe at the checkpointed position.
// This preserves the PCS and leader state.
for (int partitionId: partitionsToResume) {
PartitionConsumptionState pcs = partitionConsumptionStateMap.get(partitionId);
if (pcs == null || !pcs.isSubscribed()) {
continue;
}
try {
seekToCheckpointAndResume(pcs);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while resuming partition {} for PubSub health", partitionId, e);
Thread.currentThread().interrupt();
return;
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (4)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java:133
- PubSub exceptions are always routed to
setIngestionExceptionhere, regardless of whetherserver.pubsub.partition.pause.enabled/ the health monitor are enabled. That changes behavior even when the feature flags are off (previously these exceptions would propagate viasetLastConsumerExceptionand error the whole task). If the intent is “no behavior change when configs are off”, this routing should be gated by config or otherwise preserve the prior task-level failure semantics when pause is disabled.
if (ExceptionUtils.recursiveClassEquals(e, PubSubClientException.class, PubSubClientRetriableException.class)) {
// Downgrade from task-level kill to partition-level exception so that only the affected
// partition is paused (when PubSub health-based pause is enabled), instead of killing the
// entire ingestion task.
LOGGER.warn(
"Received PubSub exception {} for topic: {}. Propagating as partition-level exception.",
e.getClass().getSimpleName(),
topicPartition);
storeIngestionTask.setIngestionException(topicPartition.getPartitionNumber(), e);
} else {
LOGGER.error(
"Received {} while StoreIngestionTask is processing the polled consumer record for topic: {}. Will propagate via setLastConsumerException(e).",
e.getClass().getSimpleName(),
topicPartition);
storeIngestionTask.setLastConsumerException(e);
}
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:268
- On probe success the monitor unconditionally sets the target status to HEALTHY and notifies listeners, without re-evaluating
isUnhealthy(...)across all registeredPubSubHealthSignalProviders. This can mark a target HEALTHY even if another provider still considers it unhealthy, which contradicts the stated “aggregate signals from all providers” behavior. Consider recomputing overall health from providers before transitioning to HEALTHY.
// Notify all providers that the probe succeeded
for (PubSubHealthSignalProvider provider: signalProviders) {
provider.onProbeSuccess(target.address, target.category);
}
// Transition to HEALTHY
Map<PubSubHealthCategory, PubSubHealthStatus> categoryStatuses = healthStatuses.get(target.address);
if (categoryStatuses != null) {
categoryStatuses.put(target.category, PubSubHealthStatus.HEALTHY);
}
if (stats != null) {
stats.recordProbeSuccess(target.category);
stats.recordStateTransition(target.category);
}
notifyListeners(target.address, target.category, PubSubHealthStatus.HEALTHY);
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:170
- The Javadoc for
getUnhealthyCountsays it returns the number of unhealthy targets “across all categories”, but the method takes aPubSubHealthCategoryand only counts for that category. Please fix the comment to match the behavior to avoid confusion for metric semantics.
/**
* @return the number of currently unhealthy targets across all categories
*/
public int getUnhealthyCount(PubSubHealthCategory category) {
int count = 0;
for (Map<PubSubHealthCategory, PubSubHealthStatus> categoryStatuses: healthStatuses.values()) {
if (categoryStatuses.getOrDefault(category, PubSubHealthStatus.HEALTHY) == PubSubHealthStatus.UNHEALTHY) {
count++;
}
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:91
probeIntervalSecondsis taken directly from config and passed toscheduleWithFixedDelay. If it’s set to 0 or negative, the scheduler will throw and can prevent the service/server from starting cleanly. Consider validating/clamping the value (or disabling probing) when the configured interval is non-positive.
LOGGER.info("Starting PubSub health monitor with probe interval {}s", probeIntervalSeconds);
probeExecutor
.scheduleWithFixedDelay(this::runRecoveryProbe, probeIntervalSeconds, probeIntervalSeconds, TimeUnit.SECONDS);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (2)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:91
scheduleWithFixedDelayrequires a positive delay/period. Ifserver.pubsub.health.probe.interval.secondsis configured as 0 or negative,startInner()will throw and may prevent the server from starting. Consider validatingprobeIntervalSeconds > 0(or coercing to a safe minimum) before scheduling.
public boolean startInner() {
if (!enabled) {
LOGGER.info("PubSub health monitor is disabled");
return true;
}
LOGGER.info("Starting PubSub health monitor with probe interval {}s", probeIntervalSeconds);
probeExecutor
.scheduleWithFixedDelay(this::runRecoveryProbe, probeIntervalSeconds, probeIntervalSeconds, TimeUnit.SECONDS);
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java:1472
setPubSubHealthMonitorregistersthisas a listener but there’s no corresponding unregister instopInner(), and repeated calls would register duplicates. Consider making registration idempotent and unregistering the listener during shutdown to avoid duplicate callbacks and lingering references if services are restarted in-process.
public void setPubSubHealthMonitor(PubSubHealthMonitor pubSubHealthMonitor) {
this.pubSubHealthMonitor = pubSubHealthMonitor;
pubSubHealthMonitor.registerListener(this);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java
Outdated
Show resolved
Hide resolved
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Outdated
Show resolved
Hide resolved
e9d5b15 to
139b596
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (2)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:251
runRecoveryProbelogs an INFO line every probe cycle whenever there are unhealthy targets ("Running recovery probe for {} unhealthy targets"). During extended outages this can generate steady log spam on every server. Consider downgrading this to DEBUG, adding rate limiting, or only logging on transition/count changes to align with the stated “log on state transitions” behavior.
if (targets.isEmpty()) {
return;
}
LOGGER.info("Running recovery probe for {} unhealthy targets", targets.size());
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java:874
pubSubHealthProbeIntervalSecondsis used directly as thescheduleWithFixedDelayinterval. If it’s configured as 0, probes will run in a tight loop; if negative, scheduling will throw and disable probing entirely. Consider validating/clamping this to a safe minimum (e.g., >= 1) when parsing config (or inPubSubHealthMonitor.startInner()), and failing fast with a clear error if invalid.
pubSubHealthMonitorEnabled = serverProperties.getBoolean(SERVER_PUBSUB_HEALTH_MONITOR_ENABLED, false);
pubSubPartitionPauseEnabled =
pubSubHealthMonitorEnabled && serverProperties.getBoolean(SERVER_PUBSUB_PARTITION_PAUSE_ENABLED, false);
pubSubHealthProbeIntervalSeconds = serverProperties.getInt(SERVER_PUBSUB_HEALTH_PROBE_INTERVAL_SECONDS, 30);
pubSubHealthProbeTopic = serverProperties.getString(SERVER_PUBSUB_HEALTH_PROBE_TOPIC, "");
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
sushantmane
left a comment
There was a problem hiding this comment.
Review Summary
Overall strong architecture — the pluggable signal provider design, async listener notification, and the two-flag gating (monitor + pause) are well thought out. However, there are several issues ranging from a critical design gap for the Active-Active use case to concurrency concerns and behavior changes that leak outside the feature flag.
See inline comments for specific, actionable items.
| */ | ||
| protected void reportPubSubHealthException() { | ||
| if (pubSubHealthMonitor != null) { | ||
| pubSubHealthMonitor.reportPubSubException(localKafkaServer, PubSubHealthCategory.BROKER); |
There was a problem hiding this comment.
Critical — breaks A/A use case. This always reports localKafkaServer, but in Active-Active ingestion, leaders consume from remote region RT topics. When a remote region's Kafka goes down:
- Leader gets PubSub exception from consuming remote RT
- Exception reported against
localKafkaServer(which is healthy) - Probe checks local Kafka → healthy → immediate HEALTHY transition
resumePartitionsForPubSubHealthcalled → leader re-subscribes to remote RT → fails immediately- Another PubSub exception → pause → probe → resume → fail → infinite rapid pause-resume loop
The pubSubAddress reported should be the actual consumption source. For leaders, this is the remote Kafka address for the topic that threw the exception. You'll need to thread the source broker URL from the exception site (or from the PCS/OffsetRecord which knows the leader's consumption source) down to this method.
resumePartitionsForPubSubHealth also filters by localKafkaServer — this needs the same fix to match on the actual source address.
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Outdated
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (5)
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPubSubHealthPartitionPauseResumeMultiRegion.java:339
- Several assertions call dc0Client/dc1Client.get(key).get() without a timeout inside waitForNonDeterministicAssertion loops. If a request stalls, Future#get() can block indefinitely and prevent the retry/timeout logic from working, making the test hang/flaky. Prefer a bounded get (e.g., get with timeout) or a non-blocking polling pattern so the wait loop can time out cleanly.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> {
for (int i = 0; i < 10; i++) {
Object value = dc1Client.get("data-" + i).get();
assertNotNull(value, "Record 'data-" + i + "' should be readable from dc-1");
assertEquals(value.toString(), "stream_" + i);
}
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java:241
- If probeTopic is not configured, runRecoveryProbe() just logs at DEBUG and returns. With partition pause enabled this misconfiguration can silently leave partitions paused forever. Consider emitting a WARN (ideally rate-limited / only when there are unhealthy targets) when probeTopic is null so operators get a clear signal that auto-resume is disabled.
private void runRecoveryProbe() {
PubSubTopic topic = probeTopic;
if (topic == null) {
LOGGER.debug("No probe topic configured, skipping recovery probe");
return;
}
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java:1480
- KafkaStoreIngestionService registers itself as a PubSubHealthMonitor listener here, but there’s no corresponding unregister during shutdown. Since VeniceServer stops KafkaStoreIngestionService before PubSubHealthMonitor (services list stops in reverse order), the monitor can still invoke callbacks while KSIS is stopping/closed. Consider unregistering this listener in KafkaStoreIngestionService.stopInner() (or ensuring the monitor is stopped before KSIS) to avoid shutdown races/leaks.
public void setPubSubHealthMonitor(PubSubHealthMonitor pubSubHealthMonitor) {
this.pubSubHealthMonitor = pubSubHealthMonitor;
pubSubHealthMonitor.registerListener(this);
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java:5654
- There is an orphaned Javadoc block (lines 5641-5653) that is not attached to any declaration (it’s immediately followed by another Javadoc for resumePartitionsForPubSubHealth). This can trigger Javadoc/checkstyle failures and makes the intent unclear. Either merge the two comment blocks into the method Javadoc or remove the unused block.
ReadOnlyStoreRepository getStoreRepository() {
return storeRepository;
}
String getLocalKafkaServer() {
return localKafkaServer;
}
ConcurrentMap<Integer, Long> getPartitionToPreviousResubscribeTimeMap() {
return partitionToPreviousResubscribeTimeMap;
}
PriorityBlockingQueue<Integer> getResubscribeRequestQueue() {
return resubscribeRequestQueue;
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubHealthSignalProvider.java:6
- The Javadoc references {@link PubSubHealthMonitor}, but that type is not in this package/module, so the link won’t resolve during Javadoc generation. Use a fully-qualified link (e.g., com.linkedin.davinci.kafka.consumer.PubSubHealthMonitor) or remove the link to avoid doclint/Javadoc failures.
/**
* A pluggable provider of health signals for PubSub targets. The {@link PubSubHealthMonitor}
* aggregates signals from all registered providers to determine overall broker health.
*
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubHealthSignalProvider.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PubSubHealthMonitor.java
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| unsubscribeFromTopic(versionTopic, pcs); | ||
| reportPubSubHealthException(pubSubAddress); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
pausePartitionForPubSubHealth() calls unsubscribeFromTopic(), which uses consumerUnSubscribeForStateTransition() and therefore waits up to SharedKafkaConsumer.STATE_TRANSITION_MAX_WAIT_MS (30 minutes) for a poll after unsubscribe. For a PubSub outage pause/resume path this can unnecessarily block the SIT thread and delay pausing/resuming across partitions. Consider using AggKafkaConsumerService.unsubscribeConsumerFor(..., DEFAULT_MAX_WAIT_MS) / a dedicated short-timeout unsubscribe for health pauses, while still keeping the long wait only for actual Helix state transitions.
| unsubscribeFromTopic(versionTopic, pcs); | |
| reportPubSubHealthException(pubSubAddress); | |
| } | |
| /** | |
| unsubscribeFromTopicForPubSubHealth(versionTopic, pcs); | |
| reportPubSubHealthException(pubSubAddress); | |
| } | |
| /** | |
| * Unsubscribe this partition's version topic for PubSub health pauses using a short timeout. | |
| * | |
| * This avoids the long {@code STATE_TRANSITION_MAX_WAIT_MS} used for Helix state transitions, | |
| * preventing the Store Ingestion Task thread from being blocked for up to 30 minutes when | |
| * pausing partitions due to PubSub outages. | |
| */ | |
| protected void unsubscribeFromTopicForPubSubHealth(PubSubTopic topic, PartitionConsumptionState pcs) { | |
| if (aggKafkaConsumerService == null) { | |
| return; | |
| } | |
| PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, pcs.getPartition()); | |
| // Use a shorter default max wait to avoid long blocking on the SIT thread for health pauses. | |
| aggKafkaConsumerService.unsubscribeConsumerFor( | |
| topicPartition, | |
| com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService.DEFAULT_MAX_WAIT_MS); | |
| } | |
| /** |
| // Determine which topic to unsubscribe from based on where the exception originated. | ||
| // If the exception came from the local broker (produce path) and the leader is consuming | ||
| // from a remote RT, we should unsubscribe from VT (the local produce target), not the | ||
| // remote RT. If the exception came from the consume path (remote RT), unsubscribe from | ||
| // the leader topic. | ||
| pubSubHealthPausedPartitions.put(partitionId, pubSubAddress); | ||
| if (isLeader) { | ||
| PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(getPubSubTopicRepository()); | ||
| boolean exceptionFromLocalProducePath = localKafkaServer.equals(pubSubAddress) && pcs.consumeRemotely(); | ||
| if (exceptionFromLocalProducePath) { | ||
| // Exception on the local VT produce path — unsubscribe from VT, not the remote RT | ||
| unsubscribeFromTopic(versionTopic, pcs); | ||
| } else { | ||
| // Exception on the consume path — unsubscribe from the leader topic (remote RT or local RT) | ||
| unsubscribeFromTopic(leaderTopic, pcs); | ||
| } |
There was a problem hiding this comment.
This override also calls unsubscribeFromTopic(...), which routes to consumerUnSubscribeForStateTransition() (30-minute max wait) via StoreIngestionTask. For PubSub health pauses, a long state-transition wait can block the ingestion thread during outages. Consider using a shorter unsubscribe path (DEFAULT_MAX_WAIT_MS or similar) for health pauses, reserving the long wait for Helix-triggered transitions only.
| // When pause is disabled, preserve original behavior: task-level only | ||
| verify(mockSIT).setLastConsumerException(eq(pubSubException)); | ||
| verify(mockSIT, never()).setIngestionException(anyInt(), any()); | ||
| } |
There was a problem hiding this comment.
These assertions only verify the 2-arg overload of setIngestionException(int, Exception). StorePartitionDataReceiver now calls the 3-arg overload (partition, exception, pubSubSourceUrl), so the test could still pass even if partition-level exceptions are incorrectly emitted when pause is disabled. Add a never() verification for the 3-arg overload as well (or use Mockito to verify no interactions with any setIngestionException overload).
Problem Statement
When a PubSub broker experiences a transient outage, the server's ingestion pipeline either kills the entire
StoreIngestionTask(affecting all partitions in that task) or sets partitions to Helix ERROR state. They cause unnecessary data re-ingestion, increased recovery time, and operational overhead. There is no mechanism to gracefully degrade and automatically recover when the broker comes back.Solution
This PR introduces a PubSub Health Monitoring system that detects broker outages and automatically pauses affected partitions instead of killing ingestion tasks. When the broker recovers, paused partitions are automatically
resumed from their last checkpointed position so that no manual intervention required.
Architecture
PubSubHealthMonitor— Server-level singleton that tracks per-broker health status across two categories (BROKER,METADATA_SERVICE). Runs a background probe thread that periodically tests unhealthy brokers by sending metadatarequests via
TopicManager.PubSubHealthSignalProvider— Pluggable interface for health signal detection. First implementation:ExceptionBasedHealthSignalProvider(any PubSub exception = unhealthy, cleared on probe success). Extensible for future detection strategies.PubSubHealthChangeListener— Callback interface for health state transitions.KafkaStoreIngestionServiceimplements this to drive partition resume on recovery.End-to-end flow
Pause (on outage):
StorePartitionDataReceiverroutes PubSub exceptions to partition-level (setIngestionException) instead of task-level (setLastConsumerException)StoreIngestionTask.processIngestionException()detects PubSub exception → adds partition topubSubHealthPausedPartitionsset instead of unsubscribing or setting Helix ERRORPubSubHealthMonitor→ marks broker UNHEALTHY → starts background probingResume (on recovery):
KafkaStoreIngestionServiceresumePartitionsForPubSubHealth()seekToCheckpointAndResume()(uses existingresubscribeAsLeader/resubscribeAsFollowerin LFSI)New config keys (all disabled by default)
server.pubsub.health.monitor.enabledfalseserver.pubsub.partition.pause.enabledfalseserver.pubsub.health.probe.interval.seconds30OTel metrics
pubsub.health.unhealthy_count— async gauge of currently unhealthy targets per categorypubsub.health.probe.success_count/probe.failure_count— probe outcome counterspubsub.health.state_transition_count— health state transition counterpubsub.health.paused_partition_count-- async guage of the number of paused partitions per clusterCode changes
false(feature fully disabled).RedundantExceptionFilterin existing paths; new log lines are gated by state transitions (only log on HEALTHY→UNHEALTHY or vice versa).Concurrency-Specific Checks
pubSubHealthPausedPartitionsusesConcurrentHashMap.newKeySet()— thread-safe setPubSubHealthMonitorinternal state usesConcurrentHashMap— thread-safeHow was this PR tested?
New unit tests added.
New integration tests added.
Modified or extended existing tests.
Verified backward compatibility — feature disabled by default, no behavior change when configs are off.
PubSubHealthMonitorTest— Full lifecycle: exception → UNHEALTHY → probe → HEALTHY → listener notification, multi-provider aggregation, concurrent accessExceptionBasedHealthSignalProviderTest— Signal provider state machineStorePartitionDataReceiverTest— PubSub exceptions (including retriable) routed to partition-level; non-PubSub exceptions routed to task-level; InterruptedException rethrownPubSubHealthCategoryTest— Metric dimension interfaceDoes this PR introduce any user-facing or breaking changes?