-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Expected Behavior
Either invoke ReactorExecutionFlowImpl.cancel() only once, or make .cancel() idempotent.
Actual Behaviour
After upgrading from Micronaut 4.7.6 to 4.8.0 with micronaut-core:4.8.9 we started to see the following exceptions in our logs:
First:
logger_name: io.micronaut.core.execution.DelayedExecutionFlowImpl
message: Failed to execute onComplete
stack_trace: java.lang.NullPointerException: Cannot invoke "java.util.List.iterator()" because "stc" is null
at io.micronaut.http.reactive.execution.ReactorExecutionFlowImpl.cancel(ReactorExecutionFlowImpl.java:215)
at io.micronaut.core.execution.DelayedExecutionFlowImpl.cancel(DelayedExecutionFlowImpl.java:174)
at io.micronaut.http.reactive.execution.FlowAsMono$SubscriptionImpl.cancel(FlowAsMono.java:139)
at reactor.core.publisher.MonoNext$NextSubscriber.cancel(MonoNext.java:114)
at reactor.core.publisher.MonoNext$NextSubscriber.cancel(MonoNext.java:114)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:654)
at reactor.core.publisher.Operators.terminate(Operators.java:1328)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1026)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:342)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:221)
at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1087)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:844)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:867)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:994)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at io.micronaut.core.async.propagation.ReactivePropagation$2.onError(ReactivePropagation.java:129)
at io.micronaut.http.reactive.execution.FlowAsMono$SubscriptionImpl.forward(FlowAsMono.java:132)
at io.micronaut.http.reactive.execution.FlowAsMono$SubscriptionImpl.lambda$request$0(FlowAsMono.java:119)
at io.micronaut.core.execution.ImperativeExecutionFlowImpl.onComplete(ImperativeExecutionFlowImpl.java:132)
at io.micronaut.core.execution.DelayedExecutionFlowImpl$OnComplete.apply(DelayedExecutionFlowImpl.java:383)
at io.micronaut.core.execution.DelayedExecutionFlowImpl.work(DelayedExecutionFlowImpl.java:54)
at io.micronaut.core.execution.DelayedExecutionFlowImpl.complete0(DelayedExecutionFlowImpl.java:75)
at io.micronaut.core.execution.DelayedExecutionFlowImpl.completeExceptionally(DelayedExecutionFlowImpl.java:87)
at io.micronaut.core.execution.ExecutionFlow.lambda$timeout$1(ExecutionFlow.java:191)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:156)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Then repeated 3 times:
logger_name: io.micronaut.core.execution.DelayedExecutionFlowImpl
message: Failed to execute onComplete
stack_trace: java.lang.NullPointerException: null
Note: I'm not sure if these two are related.
Note: It does not appear to impact the function of the application.
Inspecting the ReactorExecutionFlowImpl.cancel() implementation, you can see that subscriptionsToCancel is only null if cancel() has been called before, and the implementation of cancel() does not appear to be idempotent:
micronaut-core/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java
Lines 208 to 218 in d0c3afe
| @Override | |
| public void cancel() { | |
| List<Subscription> stc; | |
| synchronized (this) { | |
| stc = subscriptionsToCancel; | |
| subscriptionsToCancel = null; | |
| } | |
| for (Subscription subscription : stc) { | |
| subscription.cancel(); | |
| } | |
| } |
So either it is invoked multiple times when it shouldn't be, or should have been made idempotent and was not.
Steps To Reproduce
Unknown. So far the exception appeared in logs once after upgrading.
Environment Information
- eclipse-temurin:21-alpine
micronaut-core:4.8.9
Example Application
No response
Version
4.8.0