Skip to content

Commit

Permalink
Fix executor service termination in okhttp (#1860)
Browse files Browse the repository at this point in the history
* Add wss echo test for #1839

* Fix okhttp client termination

    Close #1839

* Add missing argument

* Add okhttp termination test
  • Loading branch information
e5l authored Aug 10, 2020
1 parent 451595f commit 2783980
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 37 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ typesafe_config_version=1.3.1
apache_version=4.1.4
apache_core_version=4.4.13
gson_version=2.8.6
okhttp_version=4.4.0
okhttp_version=4.6.0
jackson_version=2.10.2
jackson_kotlin_version=2.10.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class OkHttpEngine(override val config: OkHttpConfig) : HttpClientEngineBase("kt
* Cache that keeps least recently used [OkHttpClient] instances.
*/
private val clientCache = createLRUCache(::createOkHttpClient, {}, config.clientCacheSize)

init {
val parent = super.coroutineContext[Job]!!
requestsJob = SilentSupervisor(parent)
Expand All @@ -55,6 +56,7 @@ class OkHttpEngine(override val config: OkHttpConfig) : HttpClientEngineBase("kt
} finally {
clientCache.forEach { (_, client) ->
client.connectionPool.evictAll()
client.dispatcher.executorService.shutdown()
}
(dispatcher as Closeable).close()
}
Expand Down Expand Up @@ -131,6 +133,7 @@ class OkHttpEngine(override val config: OkHttpConfig) : HttpClientEngineBase("kt
private fun createOkHttpClient(timeoutExtension: HttpTimeout.HttpTimeoutCapabilityConfiguration?): OkHttpClient {
val builder = (config.preconfigured ?: okHttpClientPrototype).newBuilder()

builder.dispatcher(Dispatcher())
builder.apply(config.config)
config.proxy?.let { builder.proxy(it) }
timeoutExtension?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,41 @@ import okhttp3.Headers
import java.io.*
import kotlin.coroutines.*

internal suspend fun OkHttpClient.execute(request: Request, requestData: HttpRequestData): Response =
suspendCancellableCoroutine {
val call = newCall(request)
val callback = object : Callback {
internal suspend fun OkHttpClient.execute(
request: Request, requestData: HttpRequestData
): Response = suspendCancellableCoroutine {
val call = newCall(request)
val callback = object : Callback {

override fun onFailure(call: Call, cause: IOException) {
if (call.isCanceled()) {
return
}
override fun onFailure(call: Call, cause: IOException) {
if (call.isCanceled()) {
return
}

val mappedException = when (cause) {
is java.net.SocketTimeoutException -> if (cause.message?.contains("connect") == true) {
ConnectTimeoutException(requestData, cause)
} else {
SocketTimeoutException(requestData, cause)
}
else -> cause
val mappedException = when (cause) {
is java.net.SocketTimeoutException -> if (cause.message?.contains("connect") == true) {
ConnectTimeoutException(requestData, cause)
} else {
SocketTimeoutException(requestData, cause)
}

it.resumeWithException(mappedException)
else -> cause
}

override fun onResponse(call: Call, response: Response) {
if (!call.isCanceled()) it.resume(response)
}
it.resumeWithException(mappedException)
}

call.enqueue(callback)

it.invokeOnCancellation {
call.cancel()
override fun onResponse(call: Call, response: Response) {
if (!call.isCanceled()) it.resume(response)
}
}

call.enqueue(callback)

it.invokeOnCancellation {
call.cancel()
}
}

internal fun Headers.fromOkHttp(): io.ktor.http.Headers = object : io.ktor.http.Headers {
override val caseInsensitiveName: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
package io.ktor.client.engine.okhttp

import io.ktor.client.*
import io.ktor.client.features.websocket.*
import io.ktor.client.request.*
import io.ktor.http.cio.websocket.*
import kotlinx.coroutines.*
import okhttp3.*
import java.util.concurrent.*
import kotlin.test.*

class OkHttpEngineTests {
@Test
fun closeTest() {
fun testClose() {
val okHttpClient = OkHttpClient()
val engine = OkHttpEngine(OkHttpConfig().apply { preconfigured = okHttpClient })
engine.close()
Expand All @@ -24,7 +26,7 @@ class OkHttpEngineTests {
}

@Test
fun threadLeakTest() = runBlocking {
fun testThreadLeak() = runBlocking {
val initialNumberOfThreads = Thread.getAllStackTraces().size

repeat(25) {
Expand All @@ -40,7 +42,7 @@ class OkHttpEngineTests {
}

@Test
fun preconfiguresTest() = runBlocking {
fun testPreconfigured() = runBlocking {
var preconfiguredClientCalled = false
val okHttpClient = OkHttpClient().newBuilder().addInterceptor(object : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
Expand All @@ -56,4 +58,17 @@ class OkHttpEngineTests {
assertTrue(preconfiguredClientCalled)
}
}

@Test
fun testRequestAfterRecreate() {
runBlocking {
HttpClient(OkHttp)
.close()

HttpClient(OkHttp).use { client ->
val response = client.get<String>("http://www.google.com")
assertNotNull(response)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,35 @@ class WebSocketTest : ClientLoader() {
fun testCancel() = clientTests(listOf("Apache", "Android", "Js", "iOS")) {
config {
install(WebSockets)
}

test { client ->
io.ktor.client.tests.utils.assertFailsWith<CancellationException> {
withTimeout(1000) {
client.webSocket("$TEST_WEBSOCKET_SERVER/websockets/echo") {
repeat(10) {
send(Frame.Text("Hello"))
delay(250)
}
test { client ->
io.ktor.client.tests.utils.assertFailsWith<CancellationException> {
withTimeout(1000) {
client.webSocket("$TEST_WEBSOCKET_SERVER/websockets/echo") {
repeat(10) {
send(Frame.Text("Hello"))
delay(250)
}
}
}
}
}
}

@Test
fun testEchoWSS() = clientTests(listOf("Apache", "Android", "Js", "iOS")) {
config {
install(WebSockets)
}

test { client ->
client.webSocket("wss://echo.websocket.org") {
outgoing.send(Frame.Text("PING"))
val frame = incoming.receive()
assertTrue(frame is Frame.Text)
assertEquals("PING", frame.readText())
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal fun HashAndSign(hashValue: Byte, signValue: Byte, oidValue: String? = n
val sign = SignatureAlgorithm.byCode(signValue) ?: return null
val oid = oidValue?.let{ OID(it) }

return HashAndSign(hash, sign)
return HashAndSign(hash, sign, oid)
}

/**
Expand Down

0 comments on commit 2783980

Please sign in to comment.