Skip to content

Commit e1f483e

Browse files
bjhhame5l
authored andcommitted
KTOR-7845 Fix for threading issue in flushAndClose for reader job channels (#4503)
1 parent ac2f544 commit e1f483e

File tree

5 files changed

+51
-2
lines changed

5 files changed

+51
-2
lines changed

ktor-io/api/ktor-io.api

+4
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ public abstract interface class io/ktor/utils/io/ChannelJob {
173173
public abstract fun getJob ()Lkotlinx/coroutines/Job;
174174
}
175175

176+
public final class io/ktor/utils/io/CloseHookByteWriteChannelKt {
177+
public static final fun onClose (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/jvm/functions/Function1;)Lio/ktor/utils/io/ByteWriteChannel;
178+
}
179+
176180
public final class io/ktor/utils/io/ConcurrentIOException : java/lang/IllegalStateException {
177181
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
178182
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V

ktor-io/api/ktor-io.klib.api

+1
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/cancel() // io.kt
364364
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close() // io.ktor.utils.io/close|[email protected](){}[0]
365365
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close(kotlin/Throwable?) // io.ktor.utils.io/close|[email protected](kotlin.Throwable?){}[0]
366366
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/counted(): io.ktor.utils.io/CountedByteWriteChannel // io.ktor.utils.io/counted|[email protected](){}[0]
367+
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/onClose(kotlin.coroutines/SuspendFunction0<kotlin/Unit>): io.ktor.utils.io/ByteWriteChannel // io.ktor.utils.io/onClose|[email protected](kotlin.coroutines.SuspendFunction0<kotlin.Unit>){}[0]
367368
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/rethrowCloseCauseIfNeeded() // io.ktor.utils.io/rethrowCloseCauseIfNeeded|[email protected](){}[0]
368369
final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/cancel() // io.ktor.utils.io/cancel|[email protected](){}[0]
369370
final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/getCancellationException(): kotlin.coroutines.cancellation/CancellationException // io.ktor.utils.io/getCancellationException|[email protected](){}[0]

ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import kotlinx.io.Buffer
1414
import kotlinx.io.bytestring.*
1515
import kotlinx.io.unsafe.*
1616
import kotlin.coroutines.*
17-
import kotlin.jvm.*
1817
import kotlin.math.*
1918

2019
@OptIn(InternalAPI::class)
@@ -309,7 +308,7 @@ public fun CoroutineScope.reader(
309308
}
310309
}
311310

312-
return ReaderJob(channel, job)
311+
return ReaderJob(channel.onClose { job.join() }, job)
313312
}
314313

315314
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.utils.io
6+
7+
/**
8+
* Wraps this channel to execute the provided action when closed using `flushAndClose()`.
9+
*
10+
* @param onClose The action to execute when the channel is closed.
11+
* @return A new `ByteWriteChannel` that executes the given action upon closure.
12+
*/
13+
public fun ByteWriteChannel.onClose(onClose: suspend () -> Unit): ByteWriteChannel =
14+
CloseHookByteWriteChannel(this, onClose)
15+
16+
internal class CloseHookByteWriteChannel(
17+
private val delegate: ByteWriteChannel,
18+
private val onClose: suspend () -> Unit
19+
) : ByteWriteChannel by delegate {
20+
override suspend fun flushAndClose() {
21+
delegate.flushAndClose()
22+
onClose()
23+
}
24+
}

ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt

+21
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import io.ktor.util.cio.*
99
import io.ktor.utils.io.*
1010
import io.ktor.utils.io.jvm.javaio.*
1111
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.test.runTest
1213
import org.junit.jupiter.api.*
1314
import org.junit.jupiter.api.extension.*
1415
import java.io.*
16+
import java.nio.file.Files
1517
import kotlin.test.*
1618
import kotlin.test.Test
1719

@@ -117,4 +119,23 @@ class FileChannelTest {
117119
// Assert (we cannot delete if there is a file handle open on it)
118120
assertTrue(temp.delete())
119121
}
122+
123+
@Test
124+
fun `writeChannel finishes on close`() = runTest {
125+
val file = Files.createTempFile("file", "txt").toFile()
126+
val ch = file.writeChannel()
127+
ch.writeStringUtf8("Hello")
128+
ch.flushAndClose()
129+
assertEquals(5, file.length())
130+
assertEquals("Hello", file.readText())
131+
}
132+
133+
@Test
134+
fun `writeChannel writes to file on flush`() = runTest {
135+
val file = Files.createTempFile("file", "txt").toFile()
136+
val ch = file.writeChannel()
137+
ch.writeStringUtf8("Hello")
138+
ch.flush()
139+
assertEquals("Hello", file.readText())
140+
}
120141
}

0 commit comments

Comments
 (0)