Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using WebSocketNetworkTransport throws IllegalArgumentException: Flow has more than one element #6276

Open
akashamin opened this issue Nov 22, 2024 · 18 comments
Labels
🐛 Type: Bug ⌛ Waiting for info More information is required
Milestone

Comments

@akashamin
Copy link

akashamin commented Nov 22, 2024

Version

3.8.5

Summary

We are currently working on a migration from Apollo2 to Apollo3 and see a bunch of IllegalArgumentException: Flow has more than one element thrown when receiving updates through webSocket.

ApolloClient builder code below

ApolloClient.Builder().apply {
    serverUrl(baseUrl)
    httpEngine(DefaultHttpEngine(newOkHttpClient))
    dispatcher(defaultDispatcher())
    normalizedCache(
        normalizedCacheFactory = cacheFactory,
        cacheResolver = resolver,
        cacheKeyGenerator = generator,
    )
    webSocketUrl?.let { url ->
        subscriptionNetworkTransport(WebSocketNetworkTransport.Builder()
            .serverUrl(url)
            .okHttpClient(newOkHttpClient)
            .protocol(
                SubscriptionWsProtocol.Factory(
                    connectionPayload = {
                        authorizationHeaders()
                    }
                )
            )
            .reopenWhen { throwable, attempt ->
                if (throwable is DisableSubscriptionException) {
                    reconnectSignalChannel.receive()
                }
                true
            }
            .build())
    }

    autoPersistedQueries(enableByDefault = enablePersistedQueries)

    customTypeAdapters?.forEach { (scalarType, customTypeAdapter) ->
        addCustomTypeAdapter(scalarType, customTypeAdapter)
    }
}.build()

I found an old issue related to the same exception, when I remove autoPersistedQueries I don't see the exception, not sure if this is a configuration issue on our end or a bug in the library.

Steps to reproduce the behavior

No response

Logs

java.lang.IllegalArgumentException: Flow has more than one element
    at kotlinx.coroutines.flow.FlowKt__ReduceKt$single$2.emit(Reduce.kt:54)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filterNot$1$2.emit(Emitters.kt:223)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit(Emitters.kt:223)
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:11)
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:11)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:113)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:82)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend(WebSocketNetworkTransport.kt:298)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke(Unknown Source:13)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke(Unknown Source:6)
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit(flows.kt:53)
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit(flows.kt:38)
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit(Emitters.kt:223)
    at kotlinx.coroutines.flow.SubscribedFlowCollector.emit(Unknown Source:2)
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:397)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
    at java.lang.Thread.run(Thread.java:1012)
@martinbonnin
Copy link
Contributor

Hi 👋

Can you share the code you're using to start the subscription and collect the Flow?

@martinbonnin martinbonnin added the ⌛ Waiting for info More information is required label Nov 25, 2024
@akashamin
Copy link
Author

akashamin commented Nov 25, 2024

Hey, sorry for the delay over the weekend, here is the code

client.subscription(subscription)
    .toFlow()
    .retryWhen { throwable, attempt ->
        // retry logic returns a boolean
    }
    .onEach { response ->
        val data = response.data
        data?.let {
             client.apolloStore.writeOperation(
                 operation = subscription,
                 operationData = data,
                 publish = true,
                 customScalarAdapters = client.customScalarAdapters
             )
        }
    }

// And using it as

repository.subscribe(subscriptionRequest).collect()

@martinbonnin
Copy link
Contributor

Thanks! I can reproduce. This is indeed a subscriptions/persisted queries integration issue. Apologies for the inconvenience, I'll make a fix for that. In the meantime, you can disable persisted queries for your subscriptions like so:

 client.subscription(subscription)
    .enableAutoPersistedQueries(false)
    .toFlow()

@akashamin
Copy link
Author

akashamin commented Nov 25, 2024

@martinbonnin Thank you for the update, we should be fine disabling PQ for subscriptions for a while, would you have an idea when would the fix be out and in which release?

@martinbonnin
Copy link
Contributor

Just for clarifying: I don't think v2 ever supported APQs + subscriptions. It's just silently disabled, which is what I was looking into porting to v3/v4.
Or do you need APQs to work with subscriptions?

@akashamin
Copy link
Author

akashamin commented Nov 25, 2024

v2 has a toggle enableAutoPersistedSubscriptions I didn't see a similar toggle in v3 so was not sure

Or do you need APQs to work with subscriptions?

Not really, we had that option disabled for v2 but as the API was present and if the need arises to enable it, wanted to know when would the fix be available

@martinbonnin
Copy link
Contributor

TBH, I don't know any server that supports it. We could maybe find a way but to make it work but it'd probably be useless without a matching server.
One of APQs benefits over HTTP is that they allow CDN caching. That doesn't work anymore for WebSockets.

@martinbonnin
Copy link
Contributor

Out of curiosity, what stack is your server? I'm curious if it supports that.

@akashamin
Copy link
Author

akashamin commented Nov 25, 2024

I believe to enable safe-list PQs (PQs manually uploaded to the server) we use the same toggle autoPersistedQueries(enableByDefault = enablePersistedQueries)?

@akashamin
Copy link
Author

Out of curiosity, what stack is your server? I'm curious if it supports that.

Our backend doesn't support APQ as of now. Sorry if I confused you.

@martinbonnin
Copy link
Contributor

No worries, APQs vs PQs is confusing.

I believe to enable safe-list PQs (PQs manually uploaded to the server) we use the same toggle autoPersistedQueries(enableByDefault = enablePersistedQueries)?

If you're using Apollo Router indeed, other backends may work differently. But judging by your WebSocketNetworkTransport usage, it looks like you're using something else? (Router uses multipart HTTP)

@akashamin
Copy link
Author

TBH, I don't know any server that supports it. We could maybe find a way but to make it work but it'd probably be useless without a matching server. One of APQs benefits over HTTP is that they allow CDN caching. That doesn't work anymore for WebSockets.

Related to the above ☝️, Sorry I meant on the client side (Android), whether we use APQ or PQ the only way to enable PQ is using the autoPersistedQueries toggle right as the doc suggests. Which means for subscriptions it would be auto enabled as Apollo3 does not have a different toggle for subscriptions? So if the intention is to use PQ with subscriptions Apollo3 won't support it?

@martinbonnin
Copy link
Contributor

martinbonnin commented Nov 26, 2024

Which means for subscriptions it would be auto enabled as Apollo3 does not have a different toggle for subscriptions? So if the intention is to use PQ with subscriptions Apollo3 won't support it?

Apollo 2 has never supported APQs or PQs with subscriptions. It was disabled under the hood. You could call ApolloClient.Builder.enableAutoPersistedQueries(true) but this would have no effect on subscriptions.

The "quick" fix is to do the same in Apollo 3 but since we're on that topic, I'll look into what it takes to add proper support, stay tuned.

@akashamin
Copy link
Author

Okay great, thank you for the help @martinbonnin

@martinbonnin
Copy link
Contributor

martinbonnin commented Nov 27, 2024

@akashamin a fix for APQs + HTTP multipart subscriptions is available in #6283.

I'm not 100% sure that'll work for your use case though. What are you using on the backend?

@martinbonnin martinbonnin added this to the 3.8.6 milestone Nov 27, 2024
@akashamin
Copy link
Author

@akashamin a fix for APQs + HTTP multipart subscriptions is available in #6283.

I'm not 100% sure that'll work for your use case though. What are you using on the backend?

Thank you for the quick fix. We are soon going to migrate HTTP multipart subscriptions but currently we use traditional web sockets.

@martinbonnin
Copy link
Contributor

currently we use traditional web sockets.

How much do you need APQs/PQs with traditional WebSockets?
There is sadly no single way to do websocket so I'd say there's a 50% chance it won't work even with the fix. But if you tell me your backend stack, I can double check.

@akashamin
Copy link
Author

currently we use traditional web sockets.

How much do you need APQs/PQs with traditional WebSockets? There is sadly no single way to do websocket so I'd say there's a 50% chance it won't work even with the fix. But if you tell me your backend stack, I can double check.

That's okay we won't need PQs with websockets as of now, as I mentioned we never had it enabled on Apollo2, it was just a question of if the need arises what should be done. Though I'll get back to you on our backend stack will need to ask them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐛 Type: Bug ⌛ Waiting for info More information is required
Projects
None yet
Development

No branches or pull requests

2 participants