From a92cc3114f77f3eb88c5169dc8c1f6e4b29dfdcf Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 16 Sep 2024 14:18:58 +0200 Subject: [PATCH] Unify Collector definitions --- .../collectors/test/BasicParallelismTest.java | 35 +++---- .../collectors/test/BasicProcessingTest.java | 71 ++----------- .../collectors/test/BatchingTest.java | 30 ++---- .../test/ExceptionHandlingTest.java | 48 +-------- .../test/ExecutorPollutionTest.java | 8 +- .../test/ExecutorValidationTest.java | 40 +++----- .../pivovarit/collectors/test/Factory.java | 99 ++++++++++++++++--- .../test/ImmediateStreamProcessingTest.java | 74 -------------- .../collectors/test/NonBlockingTest.java | 35 ++----- .../test/RejectedExecutionHandlingTest.java | 40 +++----- .../collectors/test/StreamingTest.java | 52 ++++------ 11 files changed, 178 insertions(+), 354 deletions(-) delete mode 100644 src/test/java/com/pivovarit/collectors/test/ImmediateStreamProcessingTest.java diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java index 9f9f6a7e..6885ccb7 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -8,15 +8,12 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; -import static com.pivovarit.collectors.test.BasicParallelismTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.limitedCollector; +import static com.pivovarit.collectors.test.Factory.e; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -24,15 +21,15 @@ class BasicParallelismTest { - private static Stream> allBounded() { + private static Stream>> allBounded() { return Stream.of( - collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())), - collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)), - collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), - collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)), - collector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)), - collector("parallelToOrderedStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p), Stream::toList)), - collector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList)) + limitedCollector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())), + limitedCollector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)), + limitedCollector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), + limitedCollector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)), + limitedCollector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)), + limitedCollector("parallelToOrderedStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p), Stream::toList)), + limitedCollector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList)) ); } @@ -76,19 +73,9 @@ Stream shouldRejectInvalidParallelism() { }))); } - protected record CollectorDefinition(String name, Factory.CollectorFactoryWithParallelism factory) { - static CollectorDefinition collector(String name, Factory.CollectorFactoryWithParallelism collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } - private static Duration timed(Supplier action) { long start = System.currentTimeMillis(); - var result = action.get(); + var ignored = action.get(); return Duration.ofMillis(System.currentTimeMillis() - start); } } diff --git a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java index 4bd1dfb8..fe579ef5 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java @@ -1,75 +1,32 @@ package com.pivovarit.collectors.test; -import com.pivovarit.collectors.ParallelCollectors; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; import static com.pivovarit.collectors.TestUtils.returnWithDelay; -import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.all; +import static com.pivovarit.collectors.test.Factory.allOrdered; import static java.time.Duration.ofSeconds; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.collectingAndThen; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.awaitility.Awaitility.await; class BasicProcessingTest { - private static Stream> all() { - return Stream.of( - collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), - collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), - collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), - collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), - collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), - collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)), - collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)), - collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)), - collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)), - collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), - collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), - collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), - collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) - ); - } - - public static Stream> allOrdered() { - return Stream.of( - collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), - collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), - collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), - collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), - collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), - collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), - collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), - collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), - collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) - ); - } - @TestFactory Stream shouldProcessEmpty() { return all() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { - assertThat(Stream.empty().collect(c.collector().collector(i -> i))).isEmpty(); + assertThat(Stream.empty().collect(c.factory().collector(i -> i))).isEmpty(); })); } @@ -78,7 +35,7 @@ Stream shouldProcessAllElements() { return all() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { var list = IntStream.range(0, 100).boxed().toList(); - List result = list.stream().collect(c.collector().collector(i -> i)); + List result = list.stream().collect(c.factory().collector(i -> i)); assertThat(result).containsExactlyInAnyOrderElementsOf(list); })); } @@ -88,7 +45,7 @@ Stream shouldProcessAllElementsInOrder() { return allOrdered() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { var list = IntStream.range(0, 100).boxed().toList(); - List result = list.stream().collect(c.collector().collector(i -> i)); + List result = list.stream().collect(c.factory().collector(i -> i)); assertThat(result).containsAnyElementsOf(list); })); } @@ -102,7 +59,7 @@ Stream shouldStartProcessingImmediately() { Thread.startVirtualThread(() -> { Stream.iterate(0, i -> i + 1) .limit(100) - .collect(c.collector().collector(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1)))); + .collect(c.factory().collector(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1)))); }); await() @@ -121,7 +78,7 @@ Stream shouldInterruptOnException() { var latch = new CountDownLatch(size); assertThatThrownBy(() -> IntStream.range(0, size).boxed() - .collect(c.collector().collector(i -> { + .collect(c.factory().collector(i -> { try { latch.countDown(); latch.await(); @@ -139,18 +96,4 @@ Stream shouldInterruptOnException() { await().atMost(1, SECONDS).until(() -> counter.get() == size - 1); })); } - - record CollectorDefinition(String name, Factory.CollectorFactory collector) { - static CollectorDefinition collector(String name, Factory.CollectorFactory collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } - - private static int p() { - return 4; - } } diff --git a/src/test/java/com/pivovarit/collectors/test/BatchingTest.java b/src/test/java/com/pivovarit/collectors/test/BatchingTest.java index 23be552d..d06ca7c7 100644 --- a/src/test/java/com/pivovarit/collectors/test/BatchingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BatchingTest.java @@ -4,27 +4,23 @@ import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Stream; -import static com.pivovarit.collectors.test.BatchingTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.limitedCollector; +import static com.pivovarit.collectors.test.Factory.e; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; class BatchingTest { - private static Stream> allBatching() { + private static Stream>> allBatching() { return Stream.of( - collector("parallel(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e(), p), c -> c.thenApply(Stream::toList).join())), - collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), - collector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)), - collector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList)) + limitedCollector("parallel(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e(), p), c -> c.thenApply(Stream::toList).join())), + limitedCollector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), + limitedCollector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)), + limitedCollector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList)) ); } @@ -37,7 +33,7 @@ Stream shouldProcessOnExactlyNThreads() { Stream.generate(() -> 42) .limit(100) - .collect(c.collector().collector(i -> { + .collect(c.factory().collector(i -> { threads.add(Thread.currentThread().getName()); return i; }, parallelism)); @@ -45,14 +41,4 @@ Stream shouldProcessOnExactlyNThreads() { assertThat(threads).hasSizeLessThanOrEqualTo(parallelism); })); } - - record CollectorDefinition(String name, Factory.CollectorFactoryWithParallelism collector) { - static CollectorDefinition collector(String name, Factory.CollectorFactoryWithParallelism collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } } diff --git a/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java b/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java index a44a858d..e64faa05 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java @@ -1,56 +1,28 @@ package com.pivovarit.collectors.test; -import com.pivovarit.collectors.ParallelCollectors; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; import static com.pivovarit.collectors.TestUtils.incrementAndThrow; -import static com.pivovarit.collectors.test.ExceptionHandlingTest.CollectorDefinition.collector; -import static java.util.stream.Collectors.collectingAndThen; -import static java.util.stream.Collectors.toList; +import static com.pivovarit.collectors.test.Factory.all; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class ExceptionHandlingTest { - private static Stream> all() { - return Stream.of( - collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), - collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), - collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), - collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), - collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), - collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), - collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)), - collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)), - collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)), - collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)), - collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), - collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), - collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), - collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) - ); - } - @TestFactory Stream shouldPropagateExceptionFactory() { return all() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { assertThatThrownBy(() -> IntStream.range(0, 10) .boxed() - .collect(c.collector().apply(i -> { + .collect(c.factory().collector(i -> { if (i == 7) { throw new IllegalArgumentException(); } else { @@ -70,25 +42,11 @@ Stream shouldShortcircuitOnException() { AtomicInteger counter = new AtomicInteger(); assertThatThrownBy(() -> elements.stream() - .collect(c.collector().apply(i -> incrementAndThrow(counter)))) + .collect(c.factory().collector(i -> incrementAndThrow(counter)))) .isInstanceOf(CompletionException.class) .hasCauseExactlyInstanceOf(IllegalArgumentException.class); assertThat(counter.longValue()).isLessThan(elements.size()); })); } - - record CollectorDefinition(String name, Function, Collector>> collector) { - static CollectorDefinition collector(String name, Function, Collector>> collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } - - private static int p() { - return 4; - } } diff --git a/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java b/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java index 75d0d83f..0e9f385d 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java @@ -15,13 +15,13 @@ class ExecutorPollutionTest { @TestFactory Stream shouldNotPolluteExecutorFactory() { - return boundedCollectors().map(e -> DynamicTest.dynamicTest(e.getKey(), + return boundedCollectors().map(e -> DynamicTest.dynamicTest(e.name(), () -> { try (var e1 = warmedUp(new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2)))) { var result = Stream.generate(() -> 42) .limit(1000) - .collect(e.getValue().apply(i -> i, e1, 1)); + .collect(e.factory().apply(i -> i, e1, 1)); switch (result) { case CompletableFuture cf -> cf.join(); @@ -34,12 +34,12 @@ Stream shouldNotPolluteExecutorFactory() { @TestFactory Stream shouldNotPolluteExecutorFactoryLimitedParallelism() { - return boundedCollectors().map(e -> DynamicTest.dynamicTest(e.getKey(), () -> { + return boundedCollectors().map(e -> DynamicTest.dynamicTest(e.name(), () -> { try (var e1 = warmedUp(new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2)))) { var result = Stream.generate(() -> 42) .limit(1000) - .collect(e.getValue().apply(i -> i, e1, 2)); + .collect(e.factory().apply(i -> i, e1, 2)); switch (result) { case CompletableFuture cf -> cf.join(); diff --git a/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java b/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java index 68b4d992..e698a566 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java @@ -9,28 +9,28 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static com.pivovarit.collectors.test.ExecutorValidationTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.executorCollector; import static java.util.stream.Collectors.collectingAndThen; import static org.assertj.core.api.Assertions.assertThatThrownBy; class ExecutorValidationTest { - private static Stream> allWithCustomExecutors() { + private static Stream>> allWithCustomExecutors() { return Stream.of( - collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), - collector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)), - collector("parallelToStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 1), Stream::toList)), - collector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)), - collector("parallelToStream(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 1), Stream::toList)), - collector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)), - collector("parallelToOrderedStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 1), Stream::toList)), - collector("parallelToOrderedStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 4), Stream::toList)), - collector("parallelToOrderedStream(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 1), Stream::toList)), - collector("parallelToOrderedStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 4), Stream::toList)) + executorCollector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)), + executorCollector("parallelToStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 1), Stream::toList)), + executorCollector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)), + executorCollector("parallelToStream(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 1), Stream::toList)), + executorCollector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 1), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 4), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 1), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 4), Stream::toList)) ); } @@ -44,12 +44,4 @@ Stream shouldRejectInvalidRejectedExecutionHandlerFactory() { } }))); } - - protected record CollectorDefinition(String name, Factory.CollectorFactoryWithExecutor factory) { - static CollectorDefinition collector(String name, Factory.CollectorFactoryWithExecutor factory) { - return new CollectorDefinition<>(name, factory); - } - } - - } diff --git a/src/test/java/com/pivovarit/collectors/test/Factory.java b/src/test/java/com/pivovarit/collectors/test/Factory.java index 4f40bc2a..6ab457ba 100644 --- a/src/test/java/com/pivovarit/collectors/test/Factory.java +++ b/src/test/java/com/pivovarit/collectors/test/Factory.java @@ -3,37 +3,76 @@ import com.pivovarit.collectors.ParallelCollectors; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Stream; import static com.pivovarit.collectors.ParallelCollectors.Batching.parallel; +import static com.pivovarit.collectors.test.Factory.GenericCollector.advancedCollector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.collector; +import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; -public final class Factory { +final class Factory { private Factory() { throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); } - public static Stream>> boundedCollectors() { + static Stream>> all() { return Stream.of( - Map.entry("parallel()", (f, e, p) -> ParallelCollectors.parallel(f, e, p)), - Map.entry("parallel(toList())", (f, e, p) -> ParallelCollectors.parallel(f, toList(), e, p)), - Map.entry("parallelToStream()", (f, e, p) -> ParallelCollectors.parallelToStream(f, e, p)), - Map.entry("parallelToOrderedStream()", (f, e, p) -> ParallelCollectors.parallelToOrderedStream(f, e, p)), - Map.entry("parallel() (batching)", (f, e, p) -> parallel(f, e, p)), - Map.entry("parallel(toList()) (batching)", (f, e, p) -> parallel(f, toList(), e, p)), - Map.entry("parallelToStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToStream(f, e, p)), - Map.entry("parallelToOrderedStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToOrderedStream(f, e, p))); + collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), + collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), + collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), + collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), + collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), + collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)), + collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)), + collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)), + collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), + collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), + collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) + ); + } + + static Stream>> allOrdered() { + return Stream.of( + collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), + collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), + collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), + collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), + collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), + collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), + collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), + collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) + ); + } + + public static Stream>> boundedCollectors() { + return Stream.of( + advancedCollector("parallel()", (f, e, p) -> ParallelCollectors.parallel(f, e, p)), + advancedCollector("parallel(toList())", (f, e, p) -> ParallelCollectors.parallel(f, toList(), e, p)), + advancedCollector("parallelToStream()", (f, e, p) -> ParallelCollectors.parallelToStream(f, e, p)), + advancedCollector("parallelToOrderedStream()", (f, e, p) -> ParallelCollectors.parallelToOrderedStream(f, e, p)), + advancedCollector("parallel() (batching)", (f, e, p) -> parallel(f, e, p)), + advancedCollector("parallel(toList()) (batching)", (f, e, p) -> parallel(f, toList(), e, p)), + advancedCollector("parallelToStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToStream(f, e, p)), + advancedCollector("parallelToOrderedStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToOrderedStream(f, e, p))); } @FunctionalInterface - interface CollectorFactoryWithParallelismAndExecutor { - Collector apply(Function function, Executor executorService, int parallelism); + interface CollectorFactoryWithParallelismAndExecutor { + Collector apply(Function function, Executor executorService, int parallelism); } @FunctionalInterface @@ -60,4 +99,38 @@ interface StreamingCollectorFactory { interface AsyncCollectorFactory { Collector>> collector(Function f); } + + record GenericCollector(String name, T factory) { + static GenericCollector> collector(String name, Factory.CollectorFactory collector) { + return new GenericCollector<>(name, collector); + } + + static GenericCollector> asyncCollector(String name, Factory.AsyncCollectorFactory collector) { + return new GenericCollector<>(name, collector); + } + + static GenericCollector> streamingCollector(String name, Factory.StreamingCollectorFactory collector) { + return new GenericCollector<>(name, collector); + } + + static GenericCollector> limitedCollector(String name, CollectorFactoryWithParallelism collector) { + return new GenericCollector<>(name, collector); + } + + static GenericCollector> executorCollector(String name, CollectorFactoryWithExecutor collector) { + return new GenericCollector<>(name, collector); + } + + static GenericCollector> advancedCollector(String name, CollectorFactoryWithParallelismAndExecutor collector) { + return new GenericCollector<>(name, collector); + } + } + + static Executor e() { + return Executors.newCachedThreadPool(); + } + + static int p() { + return 4; + } } diff --git a/src/test/java/com/pivovarit/collectors/test/ImmediateStreamProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/ImmediateStreamProcessingTest.java deleted file mode 100644 index 6f15320f..00000000 --- a/src/test/java/com/pivovarit/collectors/test/ImmediateStreamProcessingTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.pivovarit.collectors.test; - -import com.pivovarit.collectors.ParallelCollectors; -import org.junit.jupiter.api.DynamicTest; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collector; -import java.util.stream.Stream; - -import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; -import static java.time.Duration.ofSeconds; -import static java.util.stream.Stream.of; -import static org.awaitility.Awaitility.await; - -class ImmediateStreamProcessingTest { - - @TestFactory - Stream shouldStartProcessingElementsTests() { - return of( - shouldStartProcessingElements(f -> ParallelCollectors.parallelToStream(f, Executors.newCachedThreadPool(), 2), "parallelToStream, parallelism: 2, os threads"), - shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, 2), "parallelToStream, parallelism: 2, vthreads"), - shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newCachedThreadPool(), 2), "parallelToOrderedStream, parallelism: 2, os threads"), - shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, 2), "parallelToOrderedStream, parallelism: 2, vthreads") - ); - } - - private static DynamicTest shouldStartProcessingElements(Function, Collector>> collector, String name) { - return DynamicTest.dynamicTest(name, () -> { - var counter = new AtomicInteger(); - Thread.ofPlatform() - .start(() -> Stream.iterate(0, i -> i + 1) - .limit(100) - .collect(collector.apply(i -> { - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - return i; - })) - .forEach(c -> counter.incrementAndGet())); - await() - .atMost(ofSeconds(1)) - .until(() -> counter.get() > 0); - }); - } - - @Test - void shouldStartProcessingElementsAsSoonAsTheyAreReady() { - var e = Executors.newCachedThreadPool(); - var counter = new AtomicInteger(); - Thread.ofPlatform().start(() -> { - Collector> collector = parallelToStream(i -> { - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - return i; - }, e, 2); - Stream.iterate(0, i -> i + 1) - .limit(100) - .collect(collector) - .forEach(c -> counter.incrementAndGet()); - }); - await() - .atMost(ofSeconds(1)) - .until(() -> counter.get() > 0); - } -} diff --git a/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java index 3ababe50..bb4195d5 100644 --- a/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java @@ -5,16 +5,11 @@ import org.junit.jupiter.api.TestFactory; import java.time.Duration; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Stream; import static com.pivovarit.collectors.TestUtils.returnWithDelay; -import static com.pivovarit.collectors.test.NonBlockingTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.asyncCollector; +import static com.pivovarit.collectors.test.Factory.e; import static java.time.Duration.ofDays; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; @@ -22,15 +17,15 @@ class NonBlockingTest { - private static Stream> allAsync() { + private static Stream>> allAsync() { return Stream.of( - collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.thenApply(Stream::toList))), - collector("parallel(toList())", f -> ParallelCollectors.parallel(f, toList())), - collector("parallel(toList(), e)", f -> ParallelCollectors.parallel(f, toList(), e())), - collector("parallel(toList(), e, p=1)", f -> ParallelCollectors.parallel(f, toList(), e(), 1)), - collector("parallel(toList(), e, p=2)", f -> ParallelCollectors.parallel(f, toList(), e(), 2)), - collector("parallel(toList(), e, p=1) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 1)), - collector("parallel(toList(), e, p=2) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 2)) + asyncCollector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.thenApply(Stream::toList))), + asyncCollector("parallel(toList())", f -> ParallelCollectors.parallel(f, toList())), + asyncCollector("parallel(toList(), e)", f -> ParallelCollectors.parallel(f, toList(), e())), + asyncCollector("parallel(toList(), e, p=1)", f -> ParallelCollectors.parallel(f, toList(), e(), 1)), + asyncCollector("parallel(toList(), e, p=2)", f -> ParallelCollectors.parallel(f, toList(), e(), 2)), + asyncCollector("parallel(toList(), e, p=1) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 1)), + asyncCollector("parallel(toList(), e, p=2) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 2)) ); } @@ -43,14 +38,4 @@ Stream shouldNotBlockTheCallingThread() { }); })); } - - protected record CollectorDefinition(String name, Factory.AsyncCollectorFactory factory) { - static CollectorDefinition collector(String name, Factory.AsyncCollectorFactory collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } } diff --git a/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java b/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java index 95daf76e..e00267eb 100644 --- a/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java @@ -5,16 +5,12 @@ import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; -import java.util.List; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Stream; -import static com.pivovarit.collectors.test.RejectedExecutionHandlingTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.executorCollector; import static java.time.Duration.ofMillis; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -25,16 +21,16 @@ class RejectedExecutionHandlingTest { - private static Stream> allWithCustomExecutors() { + private static Stream>> allWithCustomExecutors() { return Stream.of( - collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), - collector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)), - collector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)), - collector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)), - collector("parallelToOrderedStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 4), Stream::toList)), - collector("parallelToOrderedStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 4), Stream::toList)) + executorCollector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)), + executorCollector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)), + executorCollector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 4), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 4), Stream::toList)) ); } @@ -52,12 +48,12 @@ Stream shouldRejectInvalidRejectedExecutionHandlerFactory() { })); } - private static Stream> allWithCustomExecutorsParallelismOne() { + private static Stream>> allWithCustomExecutorsParallelismOne() { return Stream.of( - collector("parallel(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), - collector("parallel(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), - collector("parallelToStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 1), Stream::toList)), - collector("parallelToOrderedStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 1), Stream::toList)) + executorCollector("parallel(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallel(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())), + executorCollector("parallelToStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 1), Stream::toList)), + executorCollector("parallelToOrderedStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 1), Stream::toList)) ); } @@ -74,10 +70,4 @@ Stream shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOn }).isExactlyInstanceOf(CompletionException.class); })); } - - protected record CollectorDefinition(String name, Factory.CollectorFactoryWithExecutor factory) { - static CollectorDefinition collector(String name, Factory.CollectorFactoryWithExecutor factory) { - return new CollectorDefinition<>(name, factory); - } - } } diff --git a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java index a08912b5..148164a4 100644 --- a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java @@ -6,16 +6,14 @@ import java.time.Duration; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; import static com.pivovarit.collectors.TestUtils.returnWithDelay; -import static com.pivovarit.collectors.test.StreamingTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.test.Factory.GenericCollector.streamingCollector; +import static com.pivovarit.collectors.test.Factory.e; +import static com.pivovarit.collectors.test.Factory.p; import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; import static java.util.stream.Stream.of; @@ -24,30 +22,30 @@ class StreamingTest { - private static Stream> allStreaming() { + private static Stream>> allStreaming() { return Stream.of( - collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), - collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), - collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())), - collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), - collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), - collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) + streamingCollector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), + streamingCollector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), + streamingCollector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())), + streamingCollector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), + streamingCollector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), + streamingCollector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) ); } - private static Stream> allCompletionOrderStreaming() { + private static Stream>> allCompletionOrderStreaming() { return Stream.of( - collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), - collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), - collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())) + streamingCollector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), + streamingCollector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), + streamingCollector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())) ); } - private static Stream> allOrderedStreaming() { + private static Stream>> allOrderedStreaming() { return Stream.of( - collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), - collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), - collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) + streamingCollector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), + streamingCollector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), + streamingCollector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) ); } @@ -93,18 +91,4 @@ Stream shouldCollectInOriginalOrder() { assertThat(result).containsExactlyElementsOf(source); })); } - - protected record CollectorDefinition(String name, Factory.StreamingCollectorFactory factory) { - static CollectorDefinition collector(String name, Factory.StreamingCollectorFactory collector) { - return new CollectorDefinition<>(name, collector); - } - } - - private static Executor e() { - return Executors.newCachedThreadPool(); - } - - private static int p() { - return 4; - } }