Skip to content

Commit ad94e8e

Browse files
committed
SuspendToFutureAdapter for concurrent-futures-ktx
Add a utility similar to CallbackToFutureAdapter for invoking suspend functions and returning a ListenableFuture for managing the operation in progress. Relnote: "Added SuspendToFutureAdapter for writing suspend-ListenableFuture bridges" Test: SuspendToFutureAdapterTest Change-Id: Ia8a66143012dd3e5ceb2ba22a4a0d33ad7eb8fcc
1 parent 6de934e commit ad94e8e

File tree

6 files changed

+391
-0
lines changed

6 files changed

+391
-0
lines changed

concurrent/concurrent-futures-ktx/api/current.txt

+5
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,10 @@ package androidx.concurrent.futures {
55
method public static suspend <T> Object? await(com.google.common.util.concurrent.ListenableFuture<T>, kotlin.coroutines.Continuation<? super T>);
66
}
77

8+
public final class SuspendToFutureAdapter {
9+
method public <T> com.google.common.util.concurrent.ListenableFuture<T> launchFuture(optional kotlin.coroutines.CoroutineContext context, optional boolean launchUndispatched, kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super T>,?> block);
10+
field public static final androidx.concurrent.futures.SuspendToFutureAdapter INSTANCE;
11+
}
12+
813
}
914

concurrent/concurrent-futures-ktx/api/public_plus_experimental_current.txt

+5
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,10 @@ package androidx.concurrent.futures {
55
method public static suspend <T> Object? await(com.google.common.util.concurrent.ListenableFuture<T>, kotlin.coroutines.Continuation<? super T>);
66
}
77

8+
public final class SuspendToFutureAdapter {
9+
method public <T> com.google.common.util.concurrent.ListenableFuture<T> launchFuture(optional kotlin.coroutines.CoroutineContext context, optional boolean launchUndispatched, kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super T>,?> block);
10+
field public static final androidx.concurrent.futures.SuspendToFutureAdapter INSTANCE;
11+
}
12+
813
}
914

concurrent/concurrent-futures-ktx/api/restricted_current.txt

+5
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,10 @@ package androidx.concurrent.futures {
55
method public static suspend <T> Object? await(com.google.common.util.concurrent.ListenableFuture<T>, kotlin.coroutines.Continuation<? super T>);
66
}
77

8+
public final class SuspendToFutureAdapter {
9+
method public <T> com.google.common.util.concurrent.ListenableFuture<T> launchFuture(optional kotlin.coroutines.CoroutineContext context, optional boolean launchUndispatched, kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super T>,?> block);
10+
field public static final androidx.concurrent.futures.SuspendToFutureAdapter INSTANCE;
11+
}
12+
813
}
914

concurrent/concurrent-futures-ktx/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies {
2828
api(libs.kotlinCoroutinesCore)
2929

3030
testImplementation(libs.junit)
31+
testImplementation(libs.truth)
3132
testImplementation(libs.kotlinTest)
3233
testImplementation(libs.kotlinCoroutinesTest)
3334
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2023 The Android Open Source Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package androidx.concurrent.futures
18+
19+
import com.google.common.util.concurrent.ListenableFuture
20+
import java.util.concurrent.Executor
21+
import java.util.concurrent.TimeUnit
22+
import kotlin.coroutines.Continuation
23+
import kotlin.coroutines.CoroutineContext
24+
import kotlin.coroutines.EmptyCoroutineContext
25+
import kotlin.coroutines.createCoroutine
26+
import kotlin.coroutines.resume
27+
import kotlinx.coroutines.CancellationException
28+
import kotlinx.coroutines.CoroutineScope
29+
import kotlinx.coroutines.CoroutineStart
30+
import kotlinx.coroutines.Deferred
31+
import kotlinx.coroutines.Dispatchers
32+
import kotlinx.coroutines.async
33+
34+
/**
35+
* A utility for launching suspending calls scoped and managed by a returned [ListenableFuture],
36+
* used for adapting Kotlin suspending APIs to be callable from the Java programming language.
37+
*/
38+
public object SuspendToFutureAdapter {
39+
40+
private val GlobalListenableFutureScope = CoroutineScope(Dispatchers.Main)
41+
private val GlobalListenableFutureAwaitContext = Dispatchers.Unconfined
42+
43+
/**
44+
* Launch [block] in [context], returning a [ListenableFuture] to manage the launched operation.
45+
* [block] will run **synchronously** to its first suspend point, behaving as
46+
* [CoroutineStart.UNDISPATCHED] by default; set [launchUndispatched] to false to override
47+
* and behave as [CoroutineStart.DEFAULT].
48+
*
49+
* [launchFuture] can be used to write adapters for calling suspending functions from the
50+
* Java programming language, e.g.
51+
*
52+
* ```
53+
* @file:JvmName("FancyServices")
54+
*
55+
* fun FancyService.requestAsync(
56+
* args: FancyServiceArgs
57+
* ): ListenableFuture<FancyResult> = SuspendToFutureAdapter.launchFuture {
58+
* request(args)
59+
* }
60+
* ```
61+
*
62+
* which can be called from Java language source code as follows:
63+
* ```
64+
* final ListenableFuture<FancyResult> result = FancyServices.requestAsync(service, args);
65+
* ```
66+
*
67+
* If no [kotlinx.coroutines.CoroutineDispatcher] is provided in [context], [Dispatchers.Main]
68+
* is used as the default. [ListenableFuture.get] should not be called from the main thread
69+
* prior to the future's completion (whether it was obtained from [SuspendToFutureAdapter]
70+
* or not) as any operation performed in the process of completing the future may require
71+
* main thread event processing in order to proceed, leading to potential main thread deadlock.
72+
*
73+
* If the operation performed by [block] is known to be safe for potentially reentrant
74+
* continuation resumption, immediate dispatchers such as [Dispatchers.Unconfined] may be used
75+
* as part of [context] to avoid additional thread dispatch latency. This should not be used
76+
* as a means of supporting clients blocking the main thread using [ListenableFuture.get];
77+
* this support can be broken by valid internal implementation changes to any transitive
78+
* dependencies of the operation performed by [block].
79+
*/
80+
@Suppress("AsyncSuffixFuture")
81+
public fun <T> launchFuture(
82+
context: CoroutineContext = EmptyCoroutineContext,
83+
launchUndispatched: Boolean = true,
84+
block: suspend CoroutineScope.() -> T,
85+
): ListenableFuture<T> {
86+
val resultDeferred = GlobalListenableFutureScope.async(
87+
context = context,
88+
start = if (launchUndispatched) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT,
89+
block = block
90+
)
91+
return DeferredFuture(resultDeferred).also { future ->
92+
// Deferred.getCompleted is marked experimental, so external libraries can't rely on it.
93+
// Instead, use await in a raw coroutine that will invoke [resumeWith] when it returns
94+
// using the Unconfined dispatcher.
95+
resultDeferred::await.createCoroutine(future).resume(Unit)
96+
}
97+
}
98+
99+
private class DeferredFuture<T>(
100+
private val resultDeferred: Deferred<T>
101+
) : ListenableFuture<T>, Continuation<T> {
102+
103+
private val delegateFuture = ResolvableFuture.create<T>()
104+
105+
// Implements external cancellation, propagating the cancel request to resultDeferred.
106+
// delegateFuture will be cancelled if resultDeferred becomes cancelled for
107+
// internal cancellation.
108+
override fun cancel(shouldInterrupt: Boolean): Boolean =
109+
delegateFuture.cancel(shouldInterrupt).also { didCancel ->
110+
if (didCancel) {
111+
resultDeferred.cancel()
112+
}
113+
}
114+
115+
override fun isCancelled(): Boolean = delegateFuture.isCancelled
116+
117+
override fun isDone(): Boolean = delegateFuture.isDone
118+
119+
override fun get(): T = delegateFuture.get()
120+
121+
override fun get(timeout: Long, unit: TimeUnit): T = delegateFuture.get(timeout, unit)
122+
123+
override fun addListener(listener: Runnable, executor: Executor) =
124+
delegateFuture.addListener(listener, executor)
125+
126+
override val context: CoroutineContext
127+
get() = GlobalListenableFutureAwaitContext
128+
129+
/**
130+
* Implementation of [Continuation] that will resume for the raw call to await
131+
* to resolve the [delegateFuture]
132+
*/
133+
override fun resumeWith(result: Result<T>) {
134+
result.fold(
135+
onSuccess = {
136+
delegateFuture.set(it)
137+
},
138+
onFailure = {
139+
if (it is CancellationException) {
140+
delegateFuture.cancel(false)
141+
} else {
142+
delegateFuture.setException(it)
143+
}
144+
}
145+
)
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)