diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index 7362b7f3c23..b86b946b93a 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -66,7 +66,7 @@ await c.ExpectNext(3) } [Fact] - public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order() + public async Task A_Flow_with_SelectAsync_must_produce_task_elements_in_order() { var c = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(1, 50)) @@ -128,7 +128,7 @@ await this.AssertAllStagesStoppedAsync(async () => // Turning this on in CI/CD for now [Fact] - public async Task A_Flow_with_parallel_execution_SelectAsync_must_signal_task_failure() + public async Task A_Flow_with_parallel_execution_SelectAsync_must_signal_SelectAsync_failure() { await this.AssertAllStagesStoppedAsync(async() => { var c = this.CreateManualSubscriberProbe(); @@ -153,25 +153,60 @@ await this.AssertAllStagesStoppedAsync(async() => { } [Fact] - public async Task A_Flow_with_SelectAsync_must_signal_task_failure() + public async Task A_Flow_with_SelectAsync_must_signal_task_already_failed() { await this.AssertAllStagesStoppedAsync(async() => { - var probe = Source.From(Enumerable.Range(1, 5)) - .SelectAsync(1, async n => + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectAsync(4, n => { - await Task.Delay(10); + var tcs = new TaskCompletionSource(); if (n == 3) - throw new TestException("err1"); + { + tcs.TrySetException(new TestException("err1")); + } + else + { + Task.Run(async () => + { + await Task.Delay(10.Seconds()); + tcs.SetResult(n); + }); + } + return tcs.Task; + }) + .RunWith(Sink.FromSubscriber(c), Materializer); - return n; + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + var exception = await c.ExpectErrorAsync(); + exception.InnerException!.Message.Should().Be("err1"); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectAsync_must_signal_task_failure() + { + await this.AssertAllStagesStoppedAsync(async() => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectAsync(4, n => + { + return Task.Run(async () => + { + if (n == 3) + { + throw new TestException("err1"); + } + await Task.Delay(10.Seconds()); + return n; + }); }) - .RunWith(this.SinkProbe(), Materializer); + .RunWith(Sink.FromSubscriber(c), Materializer); - var exception = await probe.AsyncBuilder() - .Request(10) - .ExpectNextN(new[]{1, 2}) - .ExpectErrorAsync() - .ShouldCompleteWithin(RemainingOrDefault); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + var exception = await c.ExpectErrorAsync(); exception.InnerException!.Message.Should().Be("err1"); }, Materializer); } @@ -192,13 +227,16 @@ await this.AssertAllStagesStoppedAsync(async () => { }) .SelectAsync(4, n => { + var c = new TaskCompletionSource(); if (n == 1) { - var c = new TaskCompletionSource(); c.SetException(new Exception("err1")); - return c.Task; } - return Task.FromResult(n); + else + { + c.SetResult(n); + } + return c.Task; }).RunWith(Sink.Ignore(), Materializer); await Awaiting(async () => await done).Should() @@ -210,6 +248,106 @@ await Awaiting(async () => await done).Should() }, Materializer); } + [Fact(DisplayName = "A Flow with SelectAsync that failed mid-stream MUST cause a failure ASAP (stopping strategy)")] + public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync_MidStream_Stop() + { + var tsa = new TaskCompletionSource(); + var tsb = new TaskCompletionSource(); + var tsc = new TaskCompletionSource(); + var tsd = new TaskCompletionSource(); + var tse = new TaskCompletionSource(); + var tsf = new TaskCompletionSource(); + + var input = new []{ tsa, tsb, tsc, tsd, tse , tsf }; + + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = Source.From(input) + .SelectAsync(5, n => n.Task) + .RunWith(this.SinkProbe(), Materializer); + + probe.Request(100); + + // placing the future completion signals here is important + // the ordering is meant to expose a race between the failure at C and subsequent elements + tsa.SetResult("A"); + tsb.SetResult("B"); + tsc.SetException(new TestException("Boom at C")); + tsd.SetResult("D"); + tse.SetResult("E"); + tsf.SetResult("F"); + + switch (await probe.ExpectNextOrErrorAsync()) + { + case Exception ex: + ex.Should().BeOfType() + .Which.InnerException.Should().BeOfType() + .Which.Message.Should().Be("Boom at C"); // fine, error can over-take elements + return; + case "A": + switch (await probe.ExpectNextOrErrorAsync()) + { + case Exception ex: + ex.Should().BeOfType() + .Which.InnerException.Should().BeOfType() + .Which.Message.Should().Be("Boom at C"); // fine, error can over-take elements + return; + case "B": + switch (await probe.ExpectNextOrErrorAsync()) + { + case Exception ex: + ex.Should().BeOfType() + .Which.InnerException.Should().BeOfType() + .Which.Message.Should().Be("Boom at C"); // fine + return; + case string s: + Assert.Fail($"Got [{s}] yet it caused an exception, should not have happened!"); + return; + } + return; + case var unexpected: + Assert.Fail($"Unexpected {unexpected}"); + return; + } + case var unexpected: + Assert.Fail($"Unexpected {unexpected}"); + return; + } + }, Materializer); + } + + [Fact(DisplayName = "A Flow with SelectAsync that failed mid-stream MUST skip element (resume strategy)")] + public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync_MidStream_Result() + { + var tsa = new TaskCompletionSource(); + var tsb = new TaskCompletionSource(); + var tsc = new TaskCompletionSource(); + var tsd = new TaskCompletionSource(); + var tse = new TaskCompletionSource(); + var tsf = new TaskCompletionSource(); + + var input = new []{ tsa, tsb, tsc, tsd, tse , tsf }; + + await this.AssertAllStagesStoppedAsync(async () => + { + var task = Source.From(input) + .SelectAsync(5, n => n.Task) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + // the problematic ordering: + tsa.SetResult("A"); + tsb.SetResult("B"); + tsd.SetResult("D"); + tse.SetResult("E"); + tsf.SetResult("F"); + tsc.SetException(new TestException("Boom at C")); + + var elements = await task; + elements.Should().BeEquivalentTo(new []{ "A", "B", "D", "E", "F"}, options => options.WithStrictOrdering()); + }, Materializer); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync() { @@ -230,7 +368,7 @@ await this.AssertAllStagesStoppedAsync(async () => { .RunWith(Sink.FromSubscriber(c), Materializer); var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectError().Message.Should().Be("err2"); + (await c.ExpectErrorAsync()).Message.Should().Be("err2"); }, Materializer); } @@ -285,17 +423,42 @@ await this.AssertAllStagesStoppedAsync(async () => { }, Materializer); } + [Fact] + public async Task A_Flow_with_SelectAsync_must_resume_when_task_already_failed() + { + await this.AssertAllStagesStoppedAsync(async () => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectAsync(4, n => + { + var tcs = new TaskCompletionSource(); + if (n == 3) + tcs.TrySetException(new TestException("err3")); + else + tcs.TrySetResult(n); + return tcs.Task; + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5 }) + await c.ExpectNextAsync(i); + await c.ExpectCompleteAsync(); + }, Materializer); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async () => { var futures = new[] { - Task.Run(() => { throw new TestException("failure1"); return "";}), - Task.Run(() => { throw new TestException("failure2"); return "";}), - Task.Run(() => { throw new TestException("failure3"); return "";}), - Task.Run(() => { throw new TestException("failure4"); return "";}), - Task.Run(() => { throw new TestException("failure5"); return "";}), + Task.FromException(new TestException("failure1")), + Task.FromException(new TestException("failure2")), + Task.FromException(new TestException("failure3")), + Task.FromException(new TestException("failure4")), + Task.FromException(new TestException("failure5")), Task.FromResult("happy") }; @@ -304,12 +467,67 @@ await this.AssertAllStagesStoppedAsync(() => { .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(Sink.First(), Materializer); - t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t.Result.Should().Be("happy"); - return Task.CompletedTask; + var result = await t.WaitAsync(TimeSpan.FromSeconds(3)); + result.Should().Be("happy"); }, Materializer); } + [Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand (parallelism = 1)")] + public async Task CompleteWithoutDemand() + { + var probe = Source.Single(1) + .SelectAsync(1, v => Task.Run(async () => + { + await Task.Delay(20); + return v; + })) + .RunWith(this.SinkProbe(), Materializer); + + probe.Request(1); + await probe.ExpectNextAsync(1); + await probe.ExpectCompleteAsync(); + } + + [Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand with completed task (parallelism = 1)")] + public async Task CompleteWithoutDemandCompletedTask() + { + var probe = Source.Single(1) + .SelectAsync(1, Task.FromResult) + .RunWith(this.SinkProbe(), Materializer); + + probe.Request(1); + await probe.ExpectNextAsync(1); + await probe.ExpectCompleteAsync(); + } + + [Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand (parallelism = 2)")] + public async Task CompleteWithoutDemandP2() + { + var probe = Source.From(new[] { 1, 2 }) + .SelectAsync(2, v => Task.Run(async () => + { + await Task.Delay(20); + return v; + })) + .RunWith(this.SinkProbe(), Materializer); + + probe.Request(2); + await probe.ExpectNextNAsync(2).ToListAsync(); + await probe.ExpectCompleteAsync(); + } + + [Fact(DisplayName = "A Flow with SelectAsync must complete without requiring further demand with completed task (parallelism = 2)")] + public async Task CompleteWithoutDemandCompletedTaskP2() + { + var probe = Source.From(new[] { 1, 2 }) + .SelectAsync(2, Task.FromResult) + .RunWith(this.SinkProbe(), Materializer); + + probe.Request(2); + await probe.ExpectNextNAsync(2).ToListAsync(); + await probe.ExpectCompleteAsync(); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_finish_after_task_failure() { @@ -331,6 +549,28 @@ await this.AssertAllStagesStoppedAsync(async() => }, Materializer); } + [Fact] + public async Task A_Flow_with_SelectAsync_must_resume_after_task_cancels() + { + var c = this.CreateManualSubscriberProbe(); + await this.AssertAllStagesStoppedAsync(async () => + { + Source.From(Enumerable.Range(1, 5)) + .SelectAsync(4, async n => + { + await MaybeCancels(n); + return n; + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5 }) + await c.ExpectNextAsync(i); + await c.ExpectCompleteAsync(); + }, Materializer); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws() { @@ -340,7 +580,12 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws() { if (n == 3) throw new TestException("err4"); - return Task.FromResult(n); + + return Task.Run(async () => + { + await Task.Delay(10); + return n; + }); }) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(Sink.FromSubscriber(c), Materializer); @@ -351,18 +596,72 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws() await c.ExpectCompleteAsync(); } + [Fact] + public async Task A_Flow_with_SelectAsync_must_restart_after_task_throws() + { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .Select(n => n) + .SelectAsync(4, n => Task.Run(async () => + { + await Task.Yield(); + if(n == 3) + throw new TestException("err3"); + return n; + })) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5}) + await c.ExpectNextAsync(i); + } + + [Fact] + public async Task A_Flow_with_SelectAsync_must_restart_when_SelectAsync_task_cancelled() + { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .Select(n => n) + .SelectAsync(4, async n => + { + await MaybeCancels(n); + return n; + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5}) + await c.ExpectNextAsync(i); + } + + private static Task MaybeCancels(int n) + { + var tcs = new TaskCompletionSource(); + Task.Run(async () => + { + await Task.Yield(); + if (n == 3) + tcs.TrySetCanceled(); + else + tcs.TrySetResult(n); + }); + return tcs.Task; + } + [Fact] public async Task A_Flow_with_SelectAsync_must_signal_NPE_when_task_is_completed_with_null() { var c = this.CreateManualSubscriberProbe(); Source.From(new[] {"a", "b"}) - .SelectAsync(4, _ => Task.FromResult(null as string)) + .SelectAsync(4, _ => Task.FromResult(null)) .To(Sink.FromSubscriber(c)).Run(Materializer); var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg); + (await c.ExpectErrorAsync()).Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg); } [Fact] @@ -370,7 +669,7 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_task_is_completed_wit { var c = this.CreateManualSubscriberProbe(); Source.From(new[] { "a", "b", "c" }) - .SelectAsync(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s)) + .SelectAsync(4, s => s.Equals("b") ? Task.FromResult(null) : Task.FromResult(s)) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .To(Sink.FromSubscriber(c)).Run(Materializer); var sub = await c.ExpectSubscriptionAsync(); @@ -380,6 +679,59 @@ public async Task A_Flow_with_SelectAsync_must_resume_when_task_is_completed_wit await c.ExpectCompleteAsync(); } + [Fact(DisplayName = "A Flow with SelectAsync must continue emitting after a sequence of nulls")] + public async Task SelectAsyncNullSequence() + { + var flow = Flow.Create() + .SelectAsync(3, v => v is 0 or >= 100 + ? Task.FromResult(v.ToString()) + : Task.FromResult(null)); + + var task = Source.From(Enumerable.Range(0, 103)) + .Via(flow) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var result = await task; + result.Should().BeEquivalentTo(new[]{"0", "100", "101", "102"}, o => o.WithStrictOrdering()); + } + + [Fact(DisplayName = "A Flow with SelectAsync must complete without emitting any elements after a sequence of nulls only")] + public async Task SelectAsyncAllNullSequence() + { + var flow = Flow.Create() + .SelectAsync(3, _ => Task.FromResult(null)); + + var task = Source.From(Enumerable.Range(0, 10)) + .Via(flow) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var result = await task; + result.Should().BeEmpty(); + } + + [Fact(DisplayName = "A Flow with SelectAsync must complete if future task returning null completed last")] + public async Task SelectAsyncNullLast() + { + var ts1 = new TaskCompletionSource(); + var ts2 = new TaskCompletionSource(); + var ts3 = new TaskCompletionSource(); + var taskSources = new[] { ts1, ts2, ts3 }; + + var task = Source.From(taskSources) + .SelectAsync(2, t => t.Task) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + ts1.TrySetResult("1"); + ts3.TrySetResult("3"); + ts2.TrySetResult(null); + + var result = await task; + result.Should().BeEquivalentTo(new[]{"1", "3"}, o => o.WithStrictOrdering()); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_handle_cancel_properly() { @@ -467,5 +819,92 @@ Task Deferred() } }, Materializer); } + + [Fact(DisplayName = "A Flow with SelectAsync must not invoke the decider twice for the same failed task")] + public async Task SelectAsyncDeciderFailedTask() + { + var failCount = new AtomicCounter(0); + var result = await Source.From(new[]{true, false}) + .SelectAsync(1, elem => Task.Run(() => + { + if (elem) + throw new TestException("this has gone too far"); + return elem; + })) + .AddAttributes(ActorAttributes.CreateSupervisionStrategy(cause => + { + switch (cause) + { + case AggregateException { InnerException: TestException }: + failCount.IncrementAndGet(); + return Directive.Resume; + default: + return Directive.Stop; + } + })) + .RunWith(Sink.Seq(), Materializer); + + result.Count.Should().Be(1); + result[0].Should().BeFalse(); + failCount.Current.Should().Be(1); + } + + [Fact(DisplayName = "A Flow with SelectAsync must not invoke the decider twice for the same task that already failed")] + public async Task SelectAsyncDeciderAlreadyFailedTask() + { + var failCount = new AtomicCounter(0); + var result = await Source.From(new[]{true, false}) + .SelectAsync(1, elem => + { + if (elem) + return Task.FromException(new TestException("this has gone too far")); + return Task.FromResult(elem); + }) + .AddAttributes(ActorAttributes.CreateSupervisionStrategy(cause => + { + switch (cause) + { + case AggregateException { InnerException: TestException }: + failCount.IncrementAndGet(); + return Directive.Resume; + default: + return Directive.Stop; + } + })) + .RunWith(Sink.Seq(), Materializer); + + result.Count.Should().Be(1); + result[0].Should().BeFalse(); + failCount.Current.Should().Be(1); + } + + [Fact(DisplayName = "A Flow with SelectAsync must not invoke the decider twice when SelectAsync throws")] + public async Task SelectAsyncDeciderFailingSelectAsync() + { + var failCount = new AtomicCounter(0); + var result = await Source.From(new[]{true, false}) + .SelectAsync(1, elem => + { + if (elem) + throw new TestException("this has gone too far"); + return Task.FromResult(elem); + }) + .AddAttributes(ActorAttributes.CreateSupervisionStrategy(cause => + { + switch (cause) + { + case TestException: + failCount.IncrementAndGet(); + return Directive.Resume; + default: + return Directive.Stop; + } + })) + .RunWith(Sink.Seq(), Materializer); + + result.Count.Should().Be(1); + result[0].Should().BeFalse(); + failCount.Current.Should().Be(1); + } } }