Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

defaultCoroutineExceptionHandler probably shouldn't catch kotlin.Error #192

Open
felixkrull-neuland opened this issue Apr 19, 2024 · 4 comments

Comments

@felixkrull-neuland
Copy link

The default CoroutineExceptionHandler in kotlin-kafka currently catches, logs, and then eats every exception that's thrown in background coroutines. Unfortunately, that includes things like OutOfMemoryError and other fatal errors that probably shouldn't be handled in this way.

Specifically we had an OOMError in the Kafka deserializer which was caught and logged by kotlin-kafka and then not processed further:

KafkaDispatcher with [io.github.nomisRev.kafka.receiver.internals.KafkaSchedulerKt$special$$inlined$CoroutineExceptionHandler$1@61131887, StandaloneCoroutine{Cancelling}@fbd2604, java.util.concurrent.ScheduledThreadPoolExecutor@3ae87e38[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 62]] failed with an uncaught exception. Report to kotlin-kafka repo please.

The consequence was that a Kafka consumer silently stopped consuming, in a way I didn't initially think to look for. I think the coroutine exception handler should rethrow java.lang.Error/kotlin.Error so that it ends up in the thread uncaught exception handler (at least on JVM as far as I can tell from the docs) which is at least something I as the user of this library can override.

IMO, the ideal solution would be for uncaught exceptions in background tasks to somehow be raised from e.g. a receive() call so they can be handled by the user as normal, but I don't know how feasible that actually is.

@nomisRev
Copy link
Owner

Thank you for the report @felixkrull-neuland! (Is this reported on 0.3.x?)

That is indeed very bad, I thought I had tests for this so let's add them..

IMO, the ideal solution would be for uncaught exceptions in background tasks to somehow be raised from e.g. a receive() call so they can be handled by the user as normal, but I don't know how feasible that actually is.

I just checked (0.4.x) in the places where this is being used, and it's only being used when the Channel is already closed 🤔 So a terminal event was already seen by the Flow prior. So we cannot send it to receive() using Channel.close, any suggestions or thoughts?

@felixkrull-neuland
Copy link
Author

Oh yeah, this is on 0.3.1, should've mentioned that, sorry :)

Rethrowing from a not-background method would probably only make sense for non-fatal throwables. If your runtime environment tells you it's running out of memory there's not really any room maybe try and report that later.

Maybe the coroutine exception handler should just rethrow all exceptions it catches regardless of whether they're fatal or not? Since it's just a last resort for unexpectedly uncaught exceptions, it might make sense to fail a bit more loudly rather than silently stop running.

@nomisRev
Copy link
Owner

Hey @felixkrull-neuland,

Let's keep this discussion going, I'd prefer to improve this based on 0.4.x but I'm not sure I understand your train of thought. Or well, I don't think it's possible with KotlinX Coroutines what you're saying.

This is an extremely simplified version of the kotlin-kafka loop, where we first create a Channel, and when the resulting Flow is collected we start the loop. Here we simulate sending 1 message, and then we encounter a failure.

So, we cannot do anything except terminate the loop and send the exception to the user. We do this by closing the Channel, and as a result it'll be re thrown from the Flow.

However, if we encounter a second error whilst closing the kotlin-kafka loop, where do we send it (This is perhaps a parallel record still being processed, or committing to Kafka that fails, etc.)

If we try to send it to the Channel again, Channel.close will return false, and ignores the error. Simple throwing an exception within onCompletion is completely ignored here. So as a best effort I send it to the CoroutineExceptionHandler instead, because I cannot send it to the Channel anymore.

Possibly there was an additional bug in 0.3.x, but that still raises the question if the 0.4.x behavior is correct. Can you reproduce the example in a simple test? (i.e. manually throwing OOM from a deserializer?).

suspend fun example(): Flow<Int> {
    val ch = Channel<Int>(Channel.BUFFERED)
    return ch.consumeAsFlow()
        .onStart {
            println("Sending")
            ch.send(1)
            println("Closing 1")
            ch.close(RuntimeException("Boom!"))
        }.onCompletion {
            println("Closing 2")
            requireNotNull(ch.close(RuntimeException("Boom2!"))) {
                "I am swallowed by KotlinX Coroutines"
            }
        }
}

fun main() = runBlocking<Unit> {
    example()
        .onEach { println(it) }
        .launchIn(this)
}
Sending
Closing 1
1
Closing 2
Exception in thread "main" java.lang.RuntimeException: Boom!
	at kotlinconf.FlowExampleKt$example$2.invokeSuspend(FlowExample.kt:18)
	at kotlinconf.FlowExampleKt$example$2.invoke(FlowExample.kt)
	at kotlinconf.FlowExampleKt$example$2.invoke(FlowExample.kt)
	at kotlinx.coroutines.flow.FlowKt__EmittersKt$onStart$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:115)
	at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:113)
	at kotlinx.coroutines.flow.FlowKt__TransformKt$onEach$$inlined$unsafeTransform$1.collect(SafeCollector.common.kt:112)
	at kotlinx.coroutines.flow.FlowKt__CollectKt.collect(Collect.kt:26)
	at kotlinx.coroutines.flow.FlowKt.collect(Unknown Source)
	at kotlinx.coroutines.flow.FlowKt__CollectKt$launchIn$1.invokeSuspend(Collect.kt:46)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at kotlinconf.FlowExampleKt.main(FlowExample.kt:27)
	at kotlinconf.FlowExampleKt.main(FlowExample.kt)

@felixkrull-neuland
Copy link
Author

(sorry for the late response 🙃 )

Ok, so if I understand correctly, you're saying that the CoroutineExceptionHandler should only ever be used when an error occurs after the channel to the "front end" has already been closed? In that case, yeah you're probably right that there's no way to pass the exception out of the receive method.


I did however get another case of an error getting passed to the CoroutineExceptionHandler, this time with a stacktrace:

KafkaDispatcher with [io.github.nomisRev.kafka.receiver.internals.KafkaSchedulerKt$special$$inlined$CoroutineExceptionHandler$1@3b339589, StandaloneCoroutine{Cancelling}@6e33c398, java.util.concurrent.ScheduledThreadPoolExecutor@11e8031[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 3]] failed with an uncaught exception. Report to kotlin-kafka repo please.
java.lang.ExceptionInInitializerError: null
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:538)
	<...more Kafka consumer internals>
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:591)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
	at io.github.nomisRev.kafka.receiver.internals.EventLoop$schedulePoll$1.invokeSuspend(PollLoop.kt:285)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
	at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	<...more JVM executor internals>
Caused by: org.apache.kafka.common.KafkaException: Kerberos exceptions could not be initialized
	at org.apache.kafka.common.security.kerberos.KerberosError.<clinit>(KerberosError.java:63)
	<I don't think the cause matters, it was a GraalVM compatibility issue>

()

I think the reason this didn't get passed out the channel is that the try-catch block around the poll, specifically the catch in EventLoop.kt:212 (this was with 0.3.1, but both it and main have a catch like this), only catches java.lang.Exception, but both OutOfMemoryError and this ExceptionInInitializerError are java.lang.Error, i.e. a direct subclass of Throwable. That line should probably catch Throwable instead so that even errors get passed out the channel. I don't know what happens if you try to pass an OOMError through a channel in an OOM situation, but it's probably worth trying?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants