Skip to content

Commit 05689e6

Browse files
committed
don't remove uninterruptibly() just yet
1 parent 3ee54c0 commit 05689e6

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

mug/src/main/java/com/google/mu/util/concurrent/Fanout.java

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,174 @@ public static void concurrently(Runnable task1, Runnable task2, Runnable... more
230230
new Scope().add(task1, task2).add(moreTasks).run();
231231
}
232232

233+
/**
234+
* Runs {@code a} and {@code b} concurrently and <em>uninterruptibly</em> in their own virtual
235+
* threads. After all of the concurrent operations return successfully, invoke the {@code join}
236+
* function on the results in the caller's thread.
237+
*
238+
* <p>For example:
239+
*
240+
* <pre>{@code
241+
* Result result = uninterruptibly(
242+
* () -> fetchArm(),
243+
* () -> fetchLeg(),
244+
* (arm, leg) -> new Result(arm, leg));
245+
* }</pre>
246+
*
247+
* @throws RuntimeException wrapping the original exception from the virtual thread if
248+
* any concurrent operation failed
249+
* @throws X thrown by the {@code join} function
250+
* @deprecated prefer using the concurrently() overloads that allow cancellation
251+
*/
252+
@Deprecated
253+
public static <A, B, R, X extends Throwable> R uninterruptibly(
254+
Supplier<A> a, Supplier<B> b, Join2<? super A, ? super B, R, X> join)
255+
throws X {
256+
requireNonNull(join);
257+
Scope scope = new Scope();
258+
AtomicReference<A> r1 = scope.add(a);
259+
AtomicReference<B> r2 = scope.add(b);
260+
scope.runUninterruptibly();
261+
return join.join(r1.get(), r2.get());
262+
}
263+
264+
/**
265+
* Runs {@code a}, {@code b} and {@code c} concurrently and <em>uninterruptibly</em> in their own
266+
* virtual threads. After all of the concurrent operations return successfully, invoke the {@code
267+
* join} function on the results in the caller's thread.
268+
*
269+
* <p>For example:
270+
*
271+
* <pre>{@code
272+
* Result result = uninterruptibly(
273+
* () -> fetchHead(),
274+
* () -> fetchArm(),
275+
* () -> fetchLeg(),
276+
* (head, arm, leg) -> new Result(head, arm, leg));
277+
* }</pre>
278+
*
279+
* @throws RuntimeException wrapping the original exception from the virtual thread if
280+
* any concurrent operation failed
281+
* @throws X thrown by the {@code join} function
282+
* @deprecated prefer using the concurrently() overloads that allow cancellation
283+
*/
284+
@Deprecated
285+
public static <A, B, C, R, X extends Throwable> R uninterruptibly(
286+
Supplier<A> a,
287+
Supplier<B> b,
288+
Supplier<C> c,
289+
Join3<? super A, ? super B, ? super C, R, X> join)
290+
throws X {
291+
requireNonNull(join);
292+
Scope scope = new Scope();
293+
AtomicReference<A> r1 = scope.add(a);
294+
AtomicReference<B> r2 = scope.add(b);
295+
AtomicReference<C> r3 = scope.add(c);
296+
scope.runUninterruptibly();
297+
return join.join(r1.get(), r2.get(), r3.get());
298+
}
299+
300+
/**
301+
* Runs {@code a}, {@code b}, {@code c} and {@code d} concurrently and <em>uninterruptibly</em> in
302+
* their own virtual threads. After all of the concurrent operations return successfully, invoke
303+
* the {@code join} function on the results in the caller's thread.
304+
*
305+
* <p>For example:
306+
*
307+
* <pre>{@code
308+
* Result result = uninterruptibly(
309+
* () -> fetchHead(),
310+
* () -> fetchShoulder(),
311+
* () -> fetchArm(),
312+
* () -> fetchLeg(),
313+
* (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
314+
* }</pre>
315+
*
316+
* @throws RuntimeException wrapping the original exception from the virtual thread if
317+
* any concurrent operation failed
318+
* @throws X thrown by the {@code join} function
319+
* @deprecated prefer using the concurrently() overloads that allow cancellation
320+
*/
321+
@Deprecated
322+
public static <A, B, C, D, R, X extends Throwable> R uninterruptibly(
323+
Supplier<A> a,
324+
Supplier<B> b,
325+
Supplier<C> c,
326+
Supplier<D> d,
327+
Join4<? super A, ? super B, ? super C, ? super D, R, X> join)
328+
throws X {
329+
requireNonNull(join);
330+
Scope scope = new Scope();
331+
AtomicReference<A> r1 = scope.add(a);
332+
AtomicReference<B> r2 = scope.add(b);
333+
AtomicReference<C> r3 = scope.add(c);
334+
AtomicReference<D> r4 = scope.add(d);
335+
scope.runUninterruptibly();
336+
return join.join(r1.get(), r2.get(), r3.get(), r4.get());
337+
}
338+
339+
/**
340+
* Runs {@code a}, {@code b}, {@code c}, {@code d} and {@code e} concurrently and
341+
* <em>uninterruptibly<em> in their own virtual threads. After all of the concurrent operations
342+
* return successfully, invoke the {@code join} function on the results in the caller's thread.
343+
*
344+
* <p>For example:
345+
*
346+
* <pre>{@code
347+
* Result result = uninterruptibly(
348+
* () -> fetchHead(),
349+
* () -> fetchShoulder(),
350+
* () -> fetchArm(),
351+
* () -> fetchLeg(),
352+
* () -> fetchFeet(),
353+
* (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
354+
* }</pre>
355+
*
356+
* @throws RuntimeException wrapping the original exception from the virtual thread if
357+
* any concurrent operation failed
358+
* @throws X thrown by the {@code join} function
359+
* @deprecated prefer using the concurrently() overloads that allow cancellation
360+
*/
361+
@Deprecated
362+
public static <A, B, C, D, E, R, X extends Throwable> R uninterruptibly(
363+
Supplier<A> a,
364+
Supplier<B> b,
365+
Supplier<C> c,
366+
Supplier<D> d,
367+
Supplier<E> e,
368+
Join5<? super A, ? super B, ? super C, ? super D, ? super E, R, X> join)
369+
throws X {
370+
requireNonNull(join);
371+
Scope scope = new Scope();
372+
AtomicReference<A> r1 = scope.add(a);
373+
AtomicReference<B> r2 = scope.add(b);
374+
AtomicReference<C> r3 = scope.add(c);
375+
AtomicReference<D> r4 = scope.add(d);
376+
AtomicReference<E> r5 = scope.add(e);
377+
scope.runUninterruptibly();
378+
return join.join(r1.get(), r2.get(), r3.get(), r4.get(), r5.get());
379+
}
380+
381+
/**
382+
* Runs {@code task1}, {@code task2} and {@code moreTasks} concurrently and
383+
* <em>uninterruptibly<em> in their own virtual threads.
384+
*
385+
* <p>For example:
386+
*
387+
* <pre>{@code
388+
* uninterruptibly(() -> uploadFile(), () -> sendMessageToQueue());
389+
* }</pre>
390+
*
391+
* @throws RuntimeException wrapping the original exception from the virtual thread if
392+
* any concurrent operation failed
393+
* @since 8.3
394+
* @deprecated prefer using the concurrently() overloads that allow cancellation
395+
*/
396+
@Deprecated
397+
public static void uninterruptibly(Runnable task1, Runnable task2, Runnable... moreTasks) {
398+
new Scope().add(task1, task2).add(moreTasks).runUninterruptibly();
399+
}
400+
233401
/**
234402
* Returns a concurrency-limited {@link Parallelizer} that can be used to run a potentially large
235403
* number of fanout concurrent tasks using the currently configured standard (virtual thread)
@@ -319,6 +487,13 @@ void run() throws StructuredConcurrencyInterruptedException {
319487
throw new StructuredConcurrencyInterruptedException(e);
320488
}
321489
}
490+
491+
@Deprecated
492+
void runUninterruptibly() {
493+
try (Completion completion = new Completion()) {
494+
withUnlimitedConcurrency().parallelizeUninterruptibly(runnables.stream().map(completion::toRun));
495+
}
496+
}
322497
}
323498

324499
private static StructuredConcurrencyExecutorPlugin loadExecutorPlugin() {

mug/src/test/java/com/google/mu/util/concurrent/FanoutTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import static com.google.common.truth.Truth.assertThat;
55
import static com.google.mu.util.concurrent.Fanout.concurrently;
6+
import static com.google.mu.util.concurrent.Fanout.uninterruptibly;
67
import static com.google.mu.util.concurrent.Fanout.withMaxConcurrency;
78
import static java.util.Arrays.asList;
89
import static org.junit.Assert.assertThrows;

0 commit comments

Comments
 (0)