Conversation
📝 WalkthroughWalkthroughReplace node-based XDS resource management with a reactive SnapshotStream model, widen SnapshotWatcher error parameter to Throwable, add stream primitives and load‑balancer factory/pool, refactor cluster/listener/bootstrap flows, delete legacy ResourceNode classes, and update many client APIs and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant RefCounted as RefCountedStream
participant Adapter as ResourceNodeAdapter
participant Config as ConfigSource
participant Factory as LoadBalancerFactory
Client->>RefCounted: subscribe(watcher)
RefCounted->>Adapter: onStart(watcher)
Adapter->>Config: resolve(resource/config)
Config-->>Adapter: resource or error
Adapter->>RefCounted: emit(snapshot or error)
RefCounted-->>Client: onUpdate(snapshot, null) or onUpdate(null, throwable)
RefCounted->>Factory: register(snapshot -> XdsLoadBalancer)
Factory-->>RefCounted: XdsLoadBalancer updates
RefCounted-->>Client: updated snapshot with load balancer
sequenceDiagram
participant Bootstrap as Bootstrap
participant ClusterStream as ClusterStream
participant EndpointStream as EndpointStream
participant LoadBalancerStream as LoadBalancerStream
participant Client as Client
Bootstrap->>ClusterStream: start()
ClusterStream->>EndpointStream: start() (derive EndpointSnapshot)
EndpointStream-->>ClusterStream: EndpointSnapshot
ClusterStream->>LoadBalancerStream: create/register(load balancer)
LoadBalancerStream-->>ClusterStream: XdsLoadBalancer
ClusterStream-->>Client: ClusterSnapshot (resource + endpoint + lb)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
9aa1d44 to
c3daf67
Compare
c3daf67 to
a0d61ae
Compare
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointGroup.java (1)
118-147: Makeendpointsfield volatile to ensure thread visibility after lock removal.The
endpointsfield is updated without synchronization inonUpdate()(line 145) andaccept()(line 228), but read by other threads viaendpoints()(line 169) andlatestValue()(line 159). ThestateLockthat previously synchronized these accesses was removed; withoutvolatile, readers can observe stale endpoint lists. Makeendpointsvolatile to preserve happens-before visibility guarantees.🔧 Suggested fix
- private List<Endpoint> endpoints = UNINITIALIZED_ENDPOINTS; + private volatile List<Endpoint> endpoints = UNINITIALIZED_ENDPOINTS;xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java (1)
81-107: Replaceassertwith defensive null check for clarity.While the code flow ensures
endpointSnapshotis never null at this call site (it's only constructed after anOptional.isPresent()check inClusterStream.java), usingassertfor nullability checks in production code is unreliable—assertions are disabled by default at runtime. Replace withObjects.requireNonNull()for explicit, always-active validation.🛡️ Suggested defensive guard
static EndpointGroup convertEndpointGroup(ClusterXdsResource clusterXdsResource, EndpointSnapshot endpointSnapshot) { + java.util.Objects.requireNonNull(endpointSnapshot, "endpointSnapshot"); final Cluster cluster = clusterXdsResource.resource(); final EndpointGroup endpointGroup; @@ private static EndpointGroup staticEndpointGroup(EndpointSnapshot endpointSnapshot) { - assert endpointSnapshot != null; final List<Endpoint> endpoints = convertLoadAssignment(endpointSnapshot.xdsResource().resource()); return EndpointGroup.of(endpoints); }Also applies to: 117-125
🤖 Fix all issues with AI agents
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/LoadBalancerReloadTest.java`:
- Around line 22-27: The test uses ArrayDeque for watcher updates which is not
thread-safe; replace java.util.ArrayDeque/Deque usage with a thread-safe
implementation such as java.util.concurrent.ConcurrentLinkedDeque and
instantiate new ConcurrentLinkedDeque<>() wherever new ArrayDeque<>() is used
(e.g., the watcher update collection in LoadBalancerReloadTest and the other
occurrence referenced around lines 227-234) so the watcher callback can safely
modify/read the deque from different threads.
In `@xds/src/main/java/com/linecorp/armeria/xds/BootstrapClusters.java`:
- Around line 67-78: The onUpdate method currently returns silently when
snapshot==null and t!=null, leaving pending CompletableFutures in initialFutures
hanging; update onUpdate (in BootstrapClusters) to detect when t is non-null and
complete the relevant pending future(s) exceptionally (e.g.,
completeExceptionally with t) instead of returning, for example by completing
the specific future for name when available or, if the resource name is unknown,
iterating initialFutures and completeExceptionally each entry so
snapshotFuture(clusterName) callers (e.g., GrpcServicesPreprocessor) will
receive the error; ensure you reference onUpdate, initialFutures,
ClusterSnapshot, and snapshotFuture in your change and include logging of t for
diagnostics.
In
`@xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultXdsLoadBalancerFactory.java`:
- Around line 59-94: A listener callback captured before re-registering an
EndpointGroup can run later and overwrite delegate with stale data; modify the
creation of updateEndpointsCallback (the lambda assigned to
updateEndpointsCallback where you call eventLoop.execute(...
updateEndpoints(...))) to capture the EndpointGroup instance at the time of
registration and, inside the scheduled task, compare that captured reference
against the current endpointGroup field and only call
updateEndpoints(clusterXdsResource, endpointSnapshot, endpoints0, watcher,
localLoadBalancer) if they match; this gates stale callbacks and keeps
updateEndpoints, PriorityStateManager, DefaultLoadBalancer/SubsetLoadBalancer,
delegate assignment, and watcher.onUpdate from being executed with an outdated
EndpointGroup.
In
`@xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancer.java`:
- Around line 24-34: The new public API (interface XdsLoadBalancer and its
method selectNow(ClientRequestContext)) must be annotated with `@UnstableApi`;
update the declaration by adding the `@UnstableApi` annotation either on the
selectNow method or on the XdsLoadBalancer interface (if the whole interface is
intended as unstable) and import the annotation so the build passes, ensuring
the signature and javadoc remain unchanged.
In
`@xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancerFactory.java`:
- Around line 57-63: Update the method Javadoc for
XdsLoadBalancerFactory.register so it no longer mentions ClusterSnapshot and
instead describes the current parameters: ClusterXdsResource clusterXdsResource,
EndpointSnapshot endpointSnapshot, SnapshotWatcher<XdsLoadBalancer> watcher, and
`@Nullable` XdsLoadBalancer localLoadBalancer; briefly explain that the method
updates the XdsLoadBalancer state using the given ClusterXdsResource and
EndpointSnapshot and that there are no thread-safety guarantees.
In `@xds/src/main/java/com/linecorp/armeria/xds/EndpointXdsResource.java`:
- Around line 37-40: EndpointXdsResource's new single-argument constructor
bypasses the validation performed by the existing three-argument constructor and
may accept invalid ClusterLoadAssignment objects; update the
EndpointXdsResource(ClusterLoadAssignment clusterLoadAssignment) constructor to
perform the same validation as the three-argument constructor by either
delegating to it (e.g., call the three-arg constructor from the single-arg one)
or invoking the same validation routine used there so the ClusterLoadAssignment
is validated before assignment.
In `@xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java`:
- Around line 34-61: In RefCountedStream.subscribe, prevent duplicate
subscriptions by using the boolean result of watchers.add(watcher) instead of
assuming add always increases size: after calling watcher.onUpdate(latestValue,
null) (if present), call watchers.add(watcher) and if it returns false
(duplicate) return Subscription.noop() immediately so you don't call onStart
again; keep the existing logic that calls onStart(this::emit) only when the add
succeeded and the set transitioned to the first watcher, and preserve the
existing error handling that removes the watcher and notifies it on failure.
In `@xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java`:
- Around line 74-88: If context.subscribe(this) throws in onStart,
resourceNodeMeterBinder remains open; modify onStart to close or call
resourceNodeMeterBinder.close() before rethrowing the exception (preserving
XdsResourceException behavior): catch Throwable t, call
resourceNodeMeterBinder.close(), then rethrow t (if instanceof
XdsResourceException rethrow as-is, else wrap in new XdsResourceException(type,
name, t)).
In `@xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java`:
- Around line 137-152: The mapped SnapshotWatcher in RouteStream.onStart
currently calls watcher.onUpdate(new RouteEntry(route, snapshot, index), t)
which can pass both a non-null snapshot and an error, violating the
SnapshotWatcher.onUpdate contract; change the mapped implementation to call
watcher.onUpdate(new RouteEntry(route, snapshot, index), null) when snapshot is
non-null so that only snapshot or error is provided (adjust the lambda in
onStart that builds mapped which is registered via
context.clusterManager().register(clusterName, context, mapped)).
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java`:
- Around line 64-71: registerLocalWatcher currently returns a one-shot empty
SnapshotStream when nodes.get(localClusterName) is null, so watchers never see
the real ClusterStream when register(Cluster cluster, ...) later adds the local
cluster; change the logic to either (A) keep a list of pending
SnapshotWatcher<Optional<ClusterSnapshot>> in XdsClusterManager and when
register(...) creates the ClusterStream for localClusterName re-subscribe/attach
all pending watchers to that ClusterStream (moving them from pending), or (B)
replace SnapshotStream.just(Optional.empty()) with a replayable/multicast stream
tied to a pending ClusterStream that will forward future updates once
nodes.put(localClusterName, clusterStream) happens; update registerLocalWatcher,
register(Cluster...), and the ClusterStream creation points to implement the
chosen approach so early watchers receive updates after the local cluster is
registered.
In
`@xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RampingUpTest.java`:
- Around line 180-185: selectEndpoints currently adds the result of
loadBalancer.selectNow(ctx()) directly into the HashSet, but selectNow is
`@Nullable` and may return null causing NPEs later (e.g., in filterEndpoint).
Change selectEndpoints to guard against null by checking the return of
XdsLoadBalancer.selectNow(ctx()) before adding to selectedEndpoints (call it
once per slot, verify non-null then add), ensuring you still perform the
expected number of selections; reference selectEndpoints and
XdsLoadBalancer.selectNow in your fix.
In `@xds/src/test/java/com/linecorp/armeria/xds/TestResourceWatcher.java`:
- Around line 40-49: The onUpdate method currently casts the Throwable t to
XdsResourceException without checking its type, which can throw
ClassCastException; update onUpdate to first check if t is an instance of
XdsResourceException (and separately MissingXdsResourceException) before casting
and handling resource-specific logic, and add a fallback branch for other
Throwables that logs and records a generic error (e.g., use
Status.fromThrowable(t) or t.toString()) so non-XdsResourceException errors are
handled safely.
🧹 Nitpick comments (6)
xds/src/main/java/com/linecorp/armeria/xds/VirtualHostSnapshot.java (1)
61-67: Defensively copyrouteEntriesto keep the snapshot immutable.This constructor accepts a caller-provided list and stores it directly, which allows external mutation and can break
equals/hashCodestability and snapshot consistency. Consider copying to an immutable list (and null-checks).♻️ Proposed change
- VirtualHostSnapshot(VirtualHostXdsResource virtualHostXdsResource, - List<RouteEntry> routeEntries, int index) { - this.virtualHostXdsResource = virtualHostXdsResource; - final VirtualHost virtualHost = virtualHostXdsResource.resource(); - filterConfigs = FilterUtil.toParsedFilterConfigs(virtualHost.getTypedPerFilterConfigMap()); - this.routeEntries = routeEntries; - this.index = index; - } + VirtualHostSnapshot(VirtualHostXdsResource virtualHostXdsResource, + List<RouteEntry> routeEntries, int index) { + this(Objects.requireNonNull(virtualHostXdsResource, "virtualHostXdsResource"), + ImmutableList.copyOf(Objects.requireNonNull(routeEntries, "routeEntries")), + FilterUtil.toParsedFilterConfigs( + virtualHostXdsResource.resource().getTypedPerFilterConfigMap()), + index); + }xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/LocalityRoutingStateFactory.java (1)
46-49: Rename parameter to matchXdsLoadBalancertype.Helps avoid confusion when reading the method.
♻️ Suggested tweak
- LocalityRoutingState create(PrioritySet upstreamPrioritySet, XdsLoadBalancer localPrioritySet) { + LocalityRoutingState create(PrioritySet upstreamPrioritySet, XdsLoadBalancer localLoadBalancer) { // Only priority 0 is supported final HostSet upstreamHostSet = upstreamPrioritySet.hostSets().get(0); - final HostSet localHostSet = localPrioritySet.hostSets().get(0); + final HostSet localHostSet = localLoadBalancer.hostSets().get(0);xds/src/main/java/com/linecorp/armeria/xds/LoadBalancerFactoryPool.java (1)
54-79: Add event-loop confinement assertions to prevent race conditions on concurrent HashMap access.
factoriesanddelayedClosesare plainHashMaps accessed from bothregister()/unregister()(called in ClusterStream constructor outside event loop) and theDelayedClosescheduled task (executing on event loop). Without serialization guarantees,ConcurrentModificationExceptionor data corruption is possible.Suggested guard for event-loop confinement
+import static com.google.common.base.Preconditions.checkState; + final class LoadBalancerFactoryPool implements SafeCloseable { + private void assertInEventLoop() { + checkState(eventLoop.inEventLoop(), + "LoadBalancerFactoryPool must be used from the event loop"); + } + XdsLoadBalancerFactory register(String name) { + assertInEventLoop(); maybeRemoveDelayedClose(name); + void unregister(String name) { + assertInEventLoop(); maybeRemoveDelayedClose(name); + public void close() { + assertInEventLoop(); for (XdsLoadBalancerFactory loadBalancer : factories.values()) {xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java (1)
37-41: Delegate to the existing constructor to avoid duplicated validation.Keeps defaults and validation logic in one place, reducing drift risk.
♻️ Proposed refactor
- RouteXdsResource(RouteConfiguration routeConfiguration) { - super("", 0); - XdsValidatorIndexRegistry.assertValid(routeConfiguration); - this.routeConfiguration = routeConfiguration; - } + RouteXdsResource(RouteConfiguration routeConfiguration) { + this(routeConfiguration, "", 0); + }it/xds-client/src/test/java/com/linecorp/armeria/xds/it/BootstrapTest.java (1)
121-128: Capture errors in the watcher to avoid silent timeouts.If
onUpdatedelivers only a Throwable, the test will just wait; storing and asserting the error makes failures explicit.🧪 Suggested tweak
- final AtomicReference<Object> objRef = new AtomicReference<>(); + final AtomicReference<Object> objRef = new AtomicReference<>(); + final AtomicReference<Throwable> errRef = new AtomicReference<>(); final SnapshotWatcher<Object> watcher = new SnapshotWatcher<>() { `@Override` public void onUpdate(`@Nullable` Object snapshot, `@Nullable` Throwable t) { + if (t != null) { + errRef.set(t); + return; + } if (snapshot != null) { objRef.set(snapshot); } } }; @@ xdsBootstrap.clusterRoot("my-cluster"); await().untilAsserted(() -> { + assertThat(errRef.get()).isNull(); assertThat(objRef.get()).isNotNull(); final ClusterSnapshot clusterSnapshot = (ClusterSnapshot) objRef.get(); assertThat(clusterSnapshot.endpointSnapshot().xdsResource().resource()).isEqualTo(endpoint); });Also applies to: 134-137
xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java (1)
80-83: Consider usingROUTEtype for RDS config source errors.The error is thrown when resolving the RDS config source for a route, but uses
LISTENERas the XdsType. This might make debugging harder since the actual missing configuration relates to route discovery. Consider whetherXdsType.ROUTEwith therouteNamewould be more accurate.♻️ Suggested change
if (configSource == null) { - return SnapshotStream.error(new XdsResourceException(LISTENER, resourceName, - "config source not found")); + return SnapshotStream.error(new XdsResourceException(ROUTE, routeName, + "config source not found")); }
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/LoadBalancerReloadTest.java
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultXdsLoadBalancerFactory.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancer.java
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancerFactory.java
Show resolved
Hide resolved
xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RampingUpTest.java
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6610 +/- ##
============================================
- Coverage 74.46% 74.42% -0.04%
- Complexity 22234 23903 +1669
============================================
Files 1963 2146 +183
Lines 82437 89121 +6684
Branches 10764 11670 +906
============================================
+ Hits 61385 66327 +4942
- Misses 15918 17223 +1305
- Partials 5134 5571 +437 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In
`@xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancerFactory.java`:
- Around line 35-41: The Javadoc incorrectly calls this interface a "variant of
XdsLoadBalancer" — update the comment for XdsLoadBalancerFactory to state that
it is a factory that produces XdsLoadBalancer instances which support updates
(use wording like "factory for creating XdsLoadBalancer instances that allow
updates to ClusterLoadAssignment"), and retain the references to
XdsLoadBalancer, ClusterLoadAssignment and XdsBootstrap to guide users to
preferred retrieval; ensure the description no longer implies the interface
itself is a load balancer variant.
In `@xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java`:
- Around line 34-36: Add Javadoc to the public subscribe method in
RefCountedStream: annotate the subscribe(SnapshotWatcher<? super T> watcher)
method with a Javadoc block using {`@inheritDoc`} so it inherits the
interface/superclass documentation; ensure the Javadoc is placed immediately
above the method declaration and follows the project's Javadoc style
conventions.
- Around line 26-80: The shared mutable fields watchers, latestValue, and
upstreamSub are accessed from subscribe(), the unsubscribe lambda, and emit()
without synchronization; fix by either (A) making access thread-safe: replace
watchers with a thread-safe collection (e.g., CopyOnWriteArraySet), mark
latestValue and upstreamSub volatile, and ensure the unsubscribe closure and
subscribe() update upstreamSub atomically (or wrap subscribe/unsubscribe/emit in
synchronized blocks), or (B) enforce event-loop confinement: document the
single-thread requirement on RefCountedStream, add an explicit runtime assertion
in subscribe(), emit(), and unsubscribe that they run on the expected event loop
(matching how ControlPlaneClientManager asserts), and remove concurrent-safety
assumptions; update onStart()/onStop() contracts accordingly and adjust tests.
🧹 Nitpick comments (2)
xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java (1)
62-66: Consider preservingXdsResourceExceptionto avoid double-wrapping.Unlike
onStart(which checksinstanceof XdsResourceExceptionbefore wrapping), this method unconditionally wraps allThrowables. Iftis already anXdsResourceException, it becomes the cause of anotherXdsResourceException, potentially losing semantic clarity in stack traces.♻️ Proposed fix for consistency with onStart
`@Override` public void onError(XdsType type, String resourceName, Throwable t) { resourceNodeMeterBinder.onError(type, resourceName, t); - emit(null, new XdsResourceException(type, resourceName, t)); + final XdsResourceException exception; + if (t instanceof XdsResourceException) { + exception = (XdsResourceException) t; + } else { + exception = new XdsResourceException(type, resourceName, t); + } + emit(null, exception); }it/xds-client/src/test/java/com/linecorp/armeria/xds/it/HealthCheckedTest.java (1)
73-80: Error parametertis silently ignored.The
SnapshotWatchercallback receives aThrowableparametertthat is never checked. If the XDS bootstrap encounters an error, the test will timeout with no diagnostic information rather than failing fast with the actual error.Additionally, the method chain to access
clusterSnapshotassumes all intermediate calls return non-null, non-empty results. If any intermediate result is null or empty, this will throw an unclearNullPointerExceptionorIndexOutOfBoundsException.Proposed fix to handle errors and add defensive checks
private static XdsLoadBalancer pollLoadBalancer(ListenerRoot root, String clusterName) { final AtomicReference<XdsLoadBalancer> lbRef = new AtomicReference<>(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); final SnapshotWatcher<ListenerSnapshot> watcher = (newSnapshot, t) -> { + if (t != null) { + errorRef.set(t); + return; + } + if (newSnapshot == null || newSnapshot.routeSnapshot() == null) { + return; + } + final var virtualHostSnapshots = newSnapshot.routeSnapshot().virtualHostSnapshots(); + if (virtualHostSnapshots.isEmpty()) { + return; + } + final var routeEntries = virtualHostSnapshots.get(0).routeEntries(); + if (routeEntries.isEmpty()) { + return; + } final ClusterSnapshot clusterSnapshot = - newSnapshot.routeSnapshot().virtualHostSnapshots().get(0).routeEntries().get(0) - .clusterSnapshot(); + routeEntries.get(0).clusterSnapshot(); if (clusterSnapshot != null && clusterName.equals(clusterSnapshot.xdsResource().name())) { lbRef.set(clusterSnapshot.loadBalancer()); } }; root.addSnapshotWatcher(watcher); - await().untilAsserted(() -> assertThat(lbRef.get()).isNotNull()); + await().untilAsserted(() -> { + assertThat(errorRef.get()).isNull(); + assertThat(lbRef.get()).isNotNull(); + }); return lbRef.get(); }
xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancerFactory.java
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java
Outdated
Show resolved
Hide resolved
ikhoon
left a comment
There was a problem hiding this comment.
Nice abstraction. 👍👍
I left some comments, but it doesn’t seem to be a blocker for merging.
| assert configSource != null; | ||
| return new ResourceNodeAdapter<EndpointXdsResource>(configSource, context, | ||
| resourceName, XdsType.ENDPOINT) | ||
| .map(EndpointSnapshot::new) |
xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/CombineNLatestStream.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java`:
- Around line 141-146: LoadBalancerStream currently registers a watcher via
loadBalancerFactory.register(...) in LoadBalancerStream.onStart(...) but never
unregisters, causing stale references; either change
XdsLoadBalancerFactory.register(...) to return a Subscription (or AutoCloseable)
that performs unregister on close and have onStart return that Subscription, or
add an override of LoadBalancerStream.onStop(...) that calls a new unregister
method on XdsLoadBalancerFactory (e.g.,
loadBalancerFactory.unregister(clusterXdsResource, watcher) or similar) to
remove the watcher when RefCountedStream stops; update the signatures
accordingly so the watcher is removed when the subscription closes.
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java`:
- Around line 79-86: The register(Cluster, ...) loop schedules subscription
creation with eventLoop.execute(), so if close() runs before those tasks execute
the created Subscription won't be added to subscriptions and won't be closed;
change to use eventLoop.submit() (or otherwise keep the returned Future) when
creating subscriptions inside register and track/wait for those Futures (or
synchronize so creation runs on the close path) so close() can await completion
before iterating subscriptions; specifically update the safeRunnable usage in
the loop that calls eventLoop.execute() to submit the task and record its Future
(or add a synchronized pending-tasks collection) and modify close() to wait for
those pending Futures to complete before closing entries in subscriptions.
🧹 Nitpick comments (8)
xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java (1)
36-42: Potential issue: Watcher receives update before being added to the set.If
watcher.onUpdate(latestValue, null)at line 37 throws an exception, the watcher is never added to the set (line 40), but we've already attempted to notify it. More critically, if the watcher'sonUpdatecallback unsubscribes itself (expecting to be subscribed), it will find itself not in the set, leading to silent no-ops.Consider moving the
latestValuedelivery after the watcher is successfully added:Suggested fix
`@Override` public final Subscription subscribe(SnapshotWatcher<? super T> watcher) { - if (latestValue != null) { - watcher.onUpdate(latestValue, null); - } - if (!watchers.add(watcher)) { return Subscription.noop(); } if (watchers.size() == 1) { try { upstreamSub = onStart(this::emit); } catch (Throwable t) { watchers.remove(watcher); watcher.onUpdate(null, t); return Subscription.noop(); } } + if (latestValue != null) { + watcher.onUpdate(latestValue, null); + } + return () -> {xds/src/test/java/com/linecorp/armeria/xds/RefCountedStreamTest.java (1)
269-278: Consider verifyingonStopis not called multiple times on repeated unsubscribes.The test verifies
hasWatchers()returns false after multiple closes, but doesn't assert thatonStopisn't invoked more than once. Given that the implementation closesupstreamSuband callsonStop()whenwatchers.isEmpty(), repeated closes could potentially double-invoke if the implementation changes.Suggested enhancement
`@Test` void multipleUnsubscribesSafe() { - final TestRefCountedStream stream = new TestRefCountedStream(Subscription::noop); + final AtomicInteger onStopCount = new AtomicInteger(0); + final TestRefCountedStream stream = new TestRefCountedStream(Subscription::noop, + onStopCount::incrementAndGet); final Subscription subscription = stream.subscribe((snapshot, error) -> {}); subscription.close(); subscription.close(); assertThat(stream.hasWatchers()).isFalse(); + assertThat(onStopCount.get()).isOne(); }xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java (2)
43-51: Eager registration in constructor may leak if stream is never subscribed.Both constructors call
loadBalancerFactoryPool.register(resourceName)immediately. If theClusterStreamis created but never subscribed to (and thusonStop()is never called), the registration persists. Consider deferring registration toonStart().Also applies to: 53-60
107-121: LocalClusterStream emits empty then subscribes - late subscribers may miss updates.
LocalClusterStream.onStart()immediately emitsOptional.empty()to the watcher, then subscribes toxdsClusterManager. If the cluster manager already has a local cluster, the subscription callback will update the watcher. However, if a second watcher subscribes after the first received the real cluster update, thelatestValueinRefCountedStreamwill still beOptional.empty()(since that was the first emission), and the second watcher won't receive the actual cluster until a new update occurs.This aligns with the comment "late local clusters shouldn't block upstream cluster loading", but the behavior may be surprising. Consider documenting this explicitly or updating
latestValuewhen the cluster manager callback fires.xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java (1)
98-103: Potential race: node removal during concurrent subscription.The cleanup lambda removes the node from
nodesif!node.hasWatchers(). If another thread/task is concurrently callingregister(String, ...), there's a window where the node is removed but a new subscription attempt usescomputeIfAbsentwhich could create a new node. Since all operations should be on the event loop, this may be safe, but the lack of explicit synchronization or assertions makes it fragile.xds/src/test/java/com/linecorp/armeria/xds/CombineNLatestStreamTest.java (1)
68-78: Consider extracting TestStream to a shared test utility.
TestStream<T>is duplicated between this file andStreamSwitchMapTest. Consider extracting it to a shared test utility class to reduce duplication.xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java (2)
56-58: Parameter name typo:streamshould bestreams.The parameter accepts a
List<S>(plural) but is namedstream(singular). This is a minor readability issue.Suggested fix
- static <S extends SnapshotStream<I>, I> SnapshotStream<List<I>> combineNLatest(List<S> stream) { - return new CombineNLatestStream<>(ImmutableList.copyOf(stream)); + static <S extends SnapshotStream<I>, I> SnapshotStream<List<I>> combineNLatest(List<S> streams) { + return new CombineNLatestStream<>(ImmutableList.copyOf(streams)); }
64-67:empty()returnsSnapshotStream<Optional<T>>which may be surprising.The
empty()factory returns a stream ofOptional<T>rather than justT. This design differs from typical reactive streamempty()semantics (which usually complete without emitting). Consider either renaming toemptyOptional()or adding Javadoc to clarify the behavior.
| @Override | ||
| protected Subscription onStart(SnapshotWatcher<XdsLoadBalancer> watcher) { | ||
| loadBalancerFactory.register(clusterXdsResource, endpointSnapshot, watcher, | ||
| localLoadBalancer); | ||
| return Subscription.noop(); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find XdsLoadBalancerFactory class and its register method
fd -t f '*LoadBalancerFactory*.java' xds/src/main/java
# Get the full implementation of register method
ast-grep --pattern 'void register($_, $_, $_, $_) {
$$$
}'Repository: line/armeria
Length of output: 361
🏁 Script executed:
#!/bin/bash
# Search for unregister, deregister, remove patterns in XDS package
rg -n "unregister|deregister" xds/src/main/java --type java -B2 -A5Repository: line/armeria
Length of output: 6078
🏁 Script executed:
#!/bin/bash
# Check if LoadBalancerStream has onStop override
rg -n "onStop|onClose" xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java -B2 -A8Repository: line/armeria
Length of output: 575
🏁 Script executed:
#!/bin/bash
# Check what LoadBalancerFactoryPool does with registrations
fd -t f '*LoadBalancerFactoryPool*.java' xds/src/main/javaRepository: line/armeria
Length of output: 365
🏁 Script executed:
#!/bin/bash
# Find LoadBalancerStream class definition in ClusterStream.java
rg -n "class LoadBalancerStream" xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java -A20Repository: line/armeria
Length of output: 1232
🏁 Script executed:
#!/bin/bash
# Get the full file to understand class structure
wc -l xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaRepository: line/armeria
Length of output: 121
🏁 Script executed:
#!/bin/bash
# Check what RefCountedStream is and how it handles onStop
fd -t f -g '*RefCountedStream*.java' xds/src/main/javaRepository: line/armeria
Length of output: 120
🏁 Script executed:
#!/bin/bash
# Examine RefCountedStream to understand subscription lifecycle
cat xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.javaRepository: line/armeria
Length of output: 2622
🏁 Script executed:
#!/bin/bash
# Check if LoadBalancerStream has onStop method
rg -n "onStop" xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaRepository: line/armeria
Length of output: 88
🏁 Script executed:
#!/bin/bash
# Look at the full LoadBalancerStream class
sed -n '123,150p' xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaRepository: line/armeria
Length of output: 1368
🏁 Script executed:
#!/bin/bash
# Check XdsLoadBalancerFactory.register signature and implementation
fd -t f -g '*.java' xds/src/main/java | xargs grep -l "class XdsLoadBalancerFactory"Repository: line/armeria
Length of output: 40
🏁 Script executed:
#!/bin/bash
# Find and examine XdsLoadBalancerFactory.register method
rg -n "void register|Subscription register" xds/src/main/java --type java -A10 | head -50Repository: line/armeria
Length of output: 5995
🏁 Script executed:
#!/bin/bash
# Get the full XdsLoadBalancerFactory interface
cat xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsLoadBalancerFactory.javaRepository: line/armeria
Length of output: 3117
🏁 Script executed:
#!/bin/bash
# Check DefaultXdsLoadBalancerFactory.register implementation
rg -n "public void register" xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultXdsLoadBalancerFactory.java -A30Repository: line/armeria
Length of output: 1927
LoadBalancerStream must unregister from loadBalancerFactory when subscription closes.
LoadBalancerStream.onStart() registers the watcher with loadBalancerFactory.register() but returns Subscription.noop(). Since XdsLoadBalancerFactory.register() returns void and has no unregister mechanism, the watcher remains registered indefinitely. When the last subscriber to LoadBalancerStream unsubscribes, RefCountedStream calls onStop(), but LoadBalancerStream does not override it, leaving the factory with a stale watcher reference.
Override onStop() in LoadBalancerStream to unregister from the factory, or refactor XdsLoadBalancerFactory.register() to return a Subscription that handles cleanup.
🤖 Prompt for AI Agents
In `@xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java` around lines
141 - 146, LoadBalancerStream currently registers a watcher via
loadBalancerFactory.register(...) in LoadBalancerStream.onStart(...) but never
unregisters, causing stale references; either change
XdsLoadBalancerFactory.register(...) to return a Subscription (or AutoCloseable)
that performs unregister on close and have onStart return that Subscription, or
add an override of LoadBalancerStream.onStop(...) that calls a new unregister
method on XdsLoadBalancerFactory (e.g.,
loadBalancerFactory.unregister(clusterXdsResource, watcher) or similar) to
remove the watcher when RefCountedStream stops; update the signatures
accordingly so the watcher is removed when the subscription closes.
| for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) { | ||
| eventLoop.execute(safeRunnable(() -> { | ||
| final Subscription subscription = node.subscribe(watcher); | ||
| subscriptions.add(subscription); | ||
| }, t -> watcher.onUpdate(null, | ||
| XdsResourceException.maybeWrap(XdsType.CLUSTER, | ||
| cluster.getName(), t)))); | ||
| } |
There was a problem hiding this comment.
Subscriptions added via eventLoop.execute may not be tracked before close().
register(Cluster, ...) schedules subscription creation via eventLoop.execute(). If close() is called before these tasks complete, the subscriptions won't be in the subscriptions list and won't be closed. Consider using eventLoop.submit() with synchronization or ensuring all pending tasks complete before closing.
🤖 Prompt for AI Agents
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java` around
lines 79 - 86, The register(Cluster, ...) loop schedules subscription creation
with eventLoop.execute(), so if close() runs before those tasks execute the
created Subscription won't be added to subscriptions and won't be closed; change
to use eventLoop.submit() (or otherwise keep the returned Future) when creating
subscriptions inside register and track/wait for those Futures (or synchronize
so creation runs on the close path) so close() can await completion before
iterating subscriptions; specifically update the safeRunnable usage in the loop
that calls eventLoop.execute() to submit the task and record its Future (or add
a synchronized pending-tasks collection) and modify close() to wait for those
pending Futures to complete before closing entries in subscriptions.
| )); | ||
| }); | ||
|
|
||
| Thread.sleep(3_000); |
There was a problem hiding this comment.
drive by comment 😴
| )); | ||
| }); | ||
|
|
||
| Thread.sleep(3_000); |
| public void register(ClusterXdsResource clusterXdsResource, EndpointSnapshot endpointSnapshot, | ||
| SnapshotWatcher<XdsLoadBalancer> watcher, | ||
| @Nullable XdsLoadBalancer localLoadBalancer) { | ||
| endpointGroup.closeAsync(); |
There was a problem hiding this comment.
Question: Is this method always called by the same executor? I'm asking if we need to make the field as a volatile or not.
|
|
||
| XdsLoadBalancerFactory register(String name) { | ||
| maybeRemoveDelayedClose(name); | ||
| final XdsLoadBalancerFactory cached = factories.get(name); |
There was a problem hiding this comment.
I want to make sure that this method is called by the eventLoop, right?
What do you think about adding assert eventLoop.inEventLoop(); in the relevant sections? It would give reviewers more confidence that this code is executing on the correct thread.
|
|
||
| void unregister(String name) { | ||
| maybeRemoveDelayedClose(name); | ||
| delayedCloses.put(name, new DelayedClose(name)); |
There was a problem hiding this comment.
Would you mind adding comment why delayed closes are needed?
| if (t instanceof XdsResourceException) { | ||
| throw t; | ||
| } else { | ||
| throw new XdsResourceException(type, name, t); |
There was a problem hiding this comment.
Shound't we add a message something like "failed to subscribe..."?
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@xds/src/main/java/com/linecorp/armeria/xds/LoadBalancerFactoryPool.java`:
- Around line 84-92: The close() method in LoadBalancerFactoryPool must be run
on the event loop like register()/unregister() to avoid concurrent access to
factories and delayedCloses; add an assertion at the start of close() (e.g.,
assert eventLoop.inEventLoop()) to enforce thread confinement, then proceed to
iterate factories and cancel delayedClose.closeFuture as before so all map
mutation/cancellation happens on the event loop thread.
| @Override | ||
| public void close() { | ||
| for (XdsLoadBalancerFactory loadBalancer : factories.values()) { | ||
| loadBalancer.close(); | ||
| } | ||
| for (DelayedClose delayedClose : delayedCloses.values()) { | ||
| delayedClose.closeFuture.cancel(true); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "LoadBalancerFactoryPool.java" -type fRepository: line/armeria
Length of output: 129
🏁 Script executed:
cat -n xds/src/main/java/com/linecorp/armeria/xds/LoadBalancerFactoryPool.javaRepository: line/armeria
Length of output: 5026
🏁 Script executed:
# Check if eventLoop field exists and how it's used
rg -A 5 -B 5 "register|unregister" xds/src/main/java/com/linecorp/armeria/xds/LoadBalancerFactoryPool.java | head -100Repository: line/armeria
Length of output: 745
🏁 Script executed:
rg -A 10 "interface SafeCloseable" --type javaRepository: line/armeria
Length of output: 428
🏁 Script executed:
# Search for usages of LoadBalancerFactoryPool to understand how close() might be called
rg "LoadBalancerFactoryPool" xds/src/main/java/ --type java -A 3 -B 3 | head -100Repository: line/armeria
Length of output: 5930
Add eventLoop.inEventLoop() assertion to close() method to prevent concurrent map access.
The register() and unregister() methods enforce thread confinement with assert eventLoop.inEventLoop(), but close() lacks this guard. Since DelayedClose tasks run on the event loop and modify factories and delayedCloses maps, calling close() from a different thread creates a race condition with concurrent HashMap access, violating the documented contract that "all methods are expected to be run from the same EventExecutor."
Proposed fix
`@Override`
public void close() {
+ assert eventLoop.inEventLoop();
for (XdsLoadBalancerFactory loadBalancer : factories.values()) {
loadBalancer.close();
}
for (DelayedClose delayedClose : delayedCloses.values()) {
delayedClose.closeFuture.cancel(true);
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Override | |
| public void close() { | |
| for (XdsLoadBalancerFactory loadBalancer : factories.values()) { | |
| loadBalancer.close(); | |
| } | |
| for (DelayedClose delayedClose : delayedCloses.values()) { | |
| delayedClose.closeFuture.cancel(true); | |
| } | |
| } | |
| `@Override` | |
| public void close() { | |
| assert eventLoop.inEventLoop(); | |
| for (XdsLoadBalancerFactory loadBalancer : factories.values()) { | |
| loadBalancer.close(); | |
| } | |
| for (DelayedClose delayedClose : delayedCloses.values()) { | |
| delayedClose.closeFuture.cancel(true); | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@xds/src/main/java/com/linecorp/armeria/xds/LoadBalancerFactoryPool.java`
around lines 84 - 92, The close() method in LoadBalancerFactoryPool must be run
on the event loop like register()/unregister() to avoid concurrent access to
factories and delayedCloses; add an assertion at the start of close() (e.g.,
assert eventLoop.inEventLoop()) to enforce thread confinement, then proceed to
iterate factories and cancel delayedClose.closeFuture as before so all map
mutation/cancellation happens on the event loop thread.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java`:
- Around line 72-86: register(...) accesses and mutates non-thread-safe fields
nodes and closed and returns a Subscription whose close() also mutates shared
state; ensure all of these interactions are executed on the event loop: in
register(String name, SubscriptionContext context,
SnapshotWatcher<ListenerSnapshot> watcher) assert or check event-loop
confinement (e.g., via context.eventLoop().inEventLoop()) and if not on the
event loop, reschedule the body that reads/writes closed and nodes onto the
event loop, and similarly have the returned Subscription.close() reschedule its
removal/hasWatchers() check and nodes.remove(name) onto the same event loop
rather than performing mutations directly; apply this to the ListenerStream
subscribe/hasWatchers path so all accesses to nodes and closed are serialized on
the event loop.
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java`:
- Around line 72-88: The register method currently creates a new ClusterStream
for each watcher inside the per-watcher eventLoop task which causes
checkArgument to fail after the first insertion; change the logic to ensure a
single ClusterStream is created (or reused) for the cluster name and then
subscribe all watchers to that single stream: in the code referencing
ClusterStream, nodes, eventLoop.execute, safeRunnable, ClusterXdsResource,
loadBalancerFactoryPool, subscribe, and subscriptions, move the creation/lookup
of the ClusterStream into one eventLoop task (or use a compute-if-absent style
under the event loop) so you call nodes.put once (or reuse existing node) and
then call node.subscribe(watcher) for each watcher, adding each returned
Subscription to subscriptions and preserving the existing error handling that
calls watcher.onUpdate on exceptions.
| Subscription register(String name, SubscriptionContext context, SnapshotWatcher<ListenerSnapshot> watcher) { | ||
| if (closed) { | ||
| return; | ||
| return Subscription.noop(); | ||
| } | ||
| final ListenerResourceNode node = nodes.computeIfAbsent(name, ignored -> { | ||
| final ListenerStream node = nodes.computeIfAbsent(name, ignored -> { | ||
| // on-demand lds if not already registered | ||
| final ConfigSource configSource = context.configSourceMapper().ldsConfigSource(name); | ||
| final ListenerResourceNode dynamicNode = | ||
| new ListenerResourceNode(configSource, name, context, | ||
| watcher, ResourceNodeType.DYNAMIC); | ||
| context.subscribe(dynamicNode); | ||
| return dynamicNode; | ||
| return new ListenerStream(name, context); | ||
| }); | ||
| node.addWatcher(watcher); | ||
| } | ||
|
|
||
| void unregister(String name, SnapshotWatcher<ListenerSnapshot> watcher) { | ||
| if (closed) { | ||
| return; | ||
| } | ||
| final ListenerResourceNode node = nodes.get(name); | ||
| if (node == null) { | ||
| return; | ||
| } | ||
| node.removeWatcher(watcher); | ||
| if (!node.hasWatchers()) { | ||
| node.close(); | ||
| nodes.remove(name); | ||
| } | ||
| final Subscription subscription = node.subscribe(watcher); | ||
| return () -> { | ||
| subscription.close(); | ||
| if (!node.hasWatchers()) { | ||
| nodes.remove(name); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Ensure register() and returned Subscription are confined to the event loop.
nodes/closed are not thread-safe, and the returned Subscription.close() mutates shared state. If this path is ever invoked off the event loop, it can race with other operations and see stale closed. Please either enforce event-loop confinement or reschedule mutations onto the event loop.
🔧 Suggested fix (enforce event-loop access and reschedule close)
Subscription register(String name, SubscriptionContext context, SnapshotWatcher<ListenerSnapshot> watcher) {
+ checkArgument(eventLoop.inEventLoop(),
+ "ListenerManager.register must be called from the event loop");
if (closed) {
return Subscription.noop();
}
final ListenerStream node = nodes.computeIfAbsent(name, ignored -> {
// on-demand lds if not already registered
return new ListenerStream(name, context);
});
final Subscription subscription = node.subscribe(watcher);
return () -> {
- subscription.close();
- if (!node.hasWatchers()) {
- nodes.remove(name);
- }
+ if (eventLoop.inEventLoop()) {
+ subscription.close();
+ if (!node.hasWatchers()) {
+ nodes.remove(name);
+ }
+ } else {
+ eventLoop.execute(() -> {
+ subscription.close();
+ if (!node.hasWatchers()) {
+ nodes.remove(name);
+ }
+ });
+ }
};
}🤖 Prompt for AI Agents
In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java` around lines
72 - 86, register(...) accesses and mutates non-thread-safe fields nodes and
closed and returns a Subscription whose close() also mutates shared state;
ensure all of these interactions are executed on the event loop: in
register(String name, SubscriptionContext context,
SnapshotWatcher<ListenerSnapshot> watcher) assert or check event-loop
confinement (e.g., via context.eventLoop().inEventLoop()) and if not on the
event loop, reschedule the body that reads/writes closed and nodes onto the
event loop, and similarly have the returned Subscription.close() reschedule its
removal/hasWatchers() check and nodes.remove(name) onto the same event loop
rather than performing mutations directly; apply this to the ListenerStream
subscribe/hasWatchers path so all accesses to nodes and closed are serialized on
the event loop.
| void register(Cluster cluster, SubscriptionContext context, | ||
| Iterable<SnapshotWatcher<? super ClusterSnapshot>> watchers) { | ||
| checkArgument(!nodes.containsKey(cluster.getName()), | ||
| "Static cluster with name '%s' already registered", cluster.getName()); | ||
| final UpdatableXdsLoadBalancer loadBalancer; | ||
| if (cluster.getName().equals(localClusterName) && localLoadBalancer != null) { | ||
| loadBalancer = localLoadBalancer; | ||
| } else { | ||
| final DefaultXdsLoadBalancerLifecycleObserver observer = | ||
| new DefaultXdsLoadBalancerLifecycleObserver(context.meterIdPrefix(), | ||
| context.meterRegistry(), cluster.getName()); | ||
| loadBalancer = XdsLoadBalancer.of(eventLoop, bootstrap.getNode().getLocality(), | ||
| localLoadBalancer, observer); | ||
| List<SnapshotWatcher<? super ClusterSnapshot>> watchers) { | ||
|
|
||
| for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) { | ||
| eventLoop.execute(safeRunnable(() -> { | ||
| checkArgument(!nodes.containsKey(cluster.getName()), | ||
| "Cluster with name '%s' already registered", cluster.getName()); | ||
| final ClusterStream node = new ClusterStream(new ClusterXdsResource(cluster), context, | ||
| loadBalancerFactoryPool); | ||
| nodes.put(cluster.getName(), node); | ||
| final Subscription subscription = node.subscribe(watcher); | ||
| subscriptions.add(subscription); | ||
| }, t -> watcher.onUpdate(null, | ||
| XdsResourceException.maybeWrap(XdsType.CLUSTER, | ||
| cluster.getName(), t)))); | ||
| } | ||
| final ClusterResourceNode node = new ClusterResourceNode(null, cluster.getName(), context, STATIC, | ||
| loadBalancer); | ||
| for (SnapshotWatcher<? super ClusterSnapshot> watcher: watchers) { | ||
| node.addWatcher(watcher); | ||
| } | ||
| nodes.put(cluster.getName(), node); | ||
| final ClusterXdsResource parsed = ClusterResourceParser.INSTANCE.parse(cluster, "", 0); | ||
| eventLoop.execute(() -> node.onChanged(parsed)); | ||
| } |
There was a problem hiding this comment.
Critical bug: Loop creates multiple ClusterStreams but map only holds one per name.
The current implementation schedules a separate task for each watcher, and each task creates a new ClusterStream and puts it in nodes. After the first task executes, subsequent tasks will fail the checkArgument because the cluster name is already in the map.
For example, with 2 watchers:
- Task 1: Creates ClusterStream, puts in
nodes[cluster.getName()]→ succeeds - Task 2:
checkArgument(!nodes.containsKey(...))→ fails with IllegalArgumentException
The logic should create one ClusterStream and subscribe all watchers to it:
🐛 Proposed fix
void register(Cluster cluster, SubscriptionContext context,
List<SnapshotWatcher<? super ClusterSnapshot>> watchers) {
-
- for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) {
- eventLoop.execute(safeRunnable(() -> {
- checkArgument(!nodes.containsKey(cluster.getName()),
- "Cluster with name '%s' already registered", cluster.getName());
- final ClusterStream node = new ClusterStream(new ClusterXdsResource(cluster), context,
- loadBalancerFactoryPool);
- nodes.put(cluster.getName(), node);
+ eventLoop.execute(safeRunnable(() -> {
+ checkArgument(!nodes.containsKey(cluster.getName()),
+ "Cluster with name '%s' already registered", cluster.getName());
+ final ClusterStream node = new ClusterStream(new ClusterXdsResource(cluster), context,
+ loadBalancerFactoryPool);
+ nodes.put(cluster.getName(), node);
+ for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) {
final Subscription subscription = node.subscribe(watcher);
subscriptions.add(subscription);
- }, t -> watcher.onUpdate(null,
- XdsResourceException.maybeWrap(XdsType.CLUSTER,
- cluster.getName(), t))));
- }
+ }
+ }, t -> {
+ final Throwable wrapped = XdsResourceException.maybeWrap(XdsType.CLUSTER,
+ cluster.getName(), t);
+ for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) {
+ watcher.onUpdate(null, wrapped);
+ }
+ }));
}🤖 Prompt for AI Agents
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java` around
lines 72 - 88, The register method currently creates a new ClusterStream for
each watcher inside the per-watcher eventLoop task which causes checkArgument to
fail after the first insertion; change the logic to ensure a single
ClusterStream is created (or reused) for the cluster name and then subscribe all
watchers to that single stream: in the code referencing ClusterStream, nodes,
eventLoop.execute, safeRunnable, ClusterXdsResource, loadBalancerFactoryPool,
subscribe, and subscriptions, move the creation/lookup of the ClusterStream into
one eventLoop task (or use a compute-if-absent style under the event loop) so
you call nodes.put once (or reuse existing node) and then call
node.subscribe(watcher) for each watcher, adding each returned Subscription to
subscriptions and preserving the existing error handling that calls
watcher.onUpdate on exceptions.
66f4483 to
befdf0a
Compare
Motivation
This PR refactors the xDS client from a resource-node hierarchy to a stream-based reactive design. The goal is to make snapshot composition and lifecycle management more explicit and composable, while simplifying error propagation and load balancer orchestration.
Modifications
There are two main changes:
Introduce a reactive snapshot stream core
SnapshotStreamandRefCountedStreamwith map/switchMap/combine-style operators, along with a static stream implementation.Snapshot-basedXdsLoadBalancerlifecycle refactoringLoadBalancerFactoryPooland a factory API for creating/updating load balancers with delayed-close semantics.XdsLoadBalancerstate across cluster updates (e.g., health / ramp-up status).XdsLoadBalanceras an immutable snapshot value that is propagated on endpoint/cluster changes.XdsLoadBalancersnapshot and propagates it to theSnapshotWatcher.XdsLoadBalancerAPI is now exposed directly, so asynchronous selection is no longer necessary.Misc
SnapshotWatcher#onUpdateto accept aThrowableto support error propagation fromSnapshotStreamimplementations (e.g.,MapStream).Result