From e27220a09ed10d71bc32cb73f4002641b0faaf94 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 15 Nov 2021 11:49:41 -0600 Subject: [PATCH] rewrite `PipeTo` to use `async` / `await` instead of a `ContinueWith` (#5361) * rewrite `PipeTo` to use `async` / `await` instead of a callback Doing this for two reasons: 1. Allows the `AsyncLocal` context to flow into the `PipeTo` and its resultant `IActorRef.Tell` call, which is helpful for telemetry purposes 2. Eliminates a `ContinueWith` delegate everywhere it's used. * fixed `QueueSinkSpec` * fixed `QueueSourceSpec` * fixed `PipeToSupportSpec` * fixed racy `PipeToSupportSpec` --- .../Akka.Streams.Tests/Dsl/QueueSinkSpec.cs | 2 +- .../Akka.Streams.Tests/Dsl/QueueSourceSpec.cs | 2 +- .../Akka.Tests/Actor/PipeToSupportSpec.cs | 23 +++++-- src/core/Akka/Actor/PipeToSupport.cs | 64 +++++++++---------- 4 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs index e9aefadff79..e7f06d1c85f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs @@ -115,7 +115,7 @@ public void QueueSink_should_fail_future_on_stream_failure() sub.SendError(TestException()); ExpectMsg( - f => f.Cause is AggregateException && f.Cause.InnerException.Equals(TestException())); + f => f.Cause.Equals(TestException())); }, _materializer); } diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs index 9e1d124236b..870c8782037 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs @@ -233,7 +233,7 @@ public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpr queue.OfferAsync(6).PipeTo(TestActor); queue.OfferAsync(7).PipeTo(TestActor); - ExpectMsg().Cause.InnerException.Should().BeOfType(); + ExpectMsg().Cause.Should().BeOfType(); probe.RequestNext(1); ExpectMsg(Enqueued.Instance); queue.Complete(); diff --git a/src/core/Akka.Tests/Actor/PipeToSupportSpec.cs b/src/core/Akka.Tests/Actor/PipeToSupportSpec.cs index c64f5d378b4..ef150cb1d8c 100644 --- a/src/core/Akka.Tests/Actor/PipeToSupportSpec.cs +++ b/src/core/Akka.Tests/Actor/PipeToSupportSpec.cs @@ -6,10 +6,12 @@ //----------------------------------------------------------------------- using System; +using System.Linq; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; using Akka.TestKit; +using FluentAssertions; using Xunit; namespace Akka.Tests.Actor @@ -27,6 +29,14 @@ public PipeToSupportSpec() _taskWithoutResult = _taskCompletionSource.Task; Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); } + + [Fact] + public void Should_immediately_PipeTo_completed_Task() + { + var task = Task.FromResult("foo"); + task.PipeTo(TestActor); + ExpectMsg("foo"); + } [Fact] public void Should_by_default_send_task_result_as_message() @@ -50,8 +60,8 @@ public void Should_by_default_send_task_exception_as_status_failure_message() _task.PipeTo(TestActor); _taskWithoutResult.PipeTo(TestActor); _taskCompletionSource.SetException(new Exception("Boom")); - ExpectMsg(x => x.Cause.InnerException.Message == "Boom"); - ExpectMsg(x => x.Cause.InnerException.Message == "Boom"); + ExpectMsg(x => x.Cause.Message == "Boom"); + ExpectMsg(x => x.Cause.Message == "Boom"); } [Fact] @@ -60,15 +70,16 @@ public void Should_use_success_handling_to_transform_task_result() _task.PipeTo(TestActor, success: x => "Hello " + x); _taskWithoutResult.PipeTo(TestActor, success: () => "Hello"); _taskCompletionSource.SetResult("World"); - ExpectMsg("Hello World"); - ExpectMsg("Hello"); + var pipeTo = ReceiveN(2).Cast().ToList(); + pipeTo.Should().Contain("Hello"); + pipeTo.Should().Contain("Hello World"); } [Fact] public void Should_use_failure_handling_to_transform_task_exception() { - _task.PipeTo(TestActor, failure: e => "Such a " + e.InnerException.Message); - _taskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.InnerException.Message); + _task.PipeTo(TestActor, failure: e => "Such a " + e.Message); + _taskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.Message); _taskCompletionSource.SetException(new Exception("failure...")); ExpectMsg("Such a failure..."); ExpectMsg("Such a failure..."); diff --git a/src/core/Akka/Actor/PipeToSupport.cs b/src/core/Akka/Actor/PipeToSupport.cs index 510d968e848..a78a95c5d84 100644 --- a/src/core/Akka/Actor/PipeToSupport.cs +++ b/src/core/Akka/Actor/PipeToSupport.cs @@ -27,28 +27,24 @@ public static class PipeToSupport /// TBD /// TBD /// TBD - /// TBD - public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func success = null, Func failure = null) + /// A detached task + public static async Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func success = null, Func failure = null) { sender = sender ?? ActorRefs.NoSender; - return taskToPipe.ContinueWith(tresult => + + try { - if (tresult.IsFaulted) - recipient.Tell(failure != null - ? failure(tresult.Exception) - : new Status.Failure(tresult.Exception), sender); - else if (tresult.IsCanceled) - { - var ex = tresult.Exception ?? new AggregateException(new TaskCanceledException()); - recipient.Tell(failure != null - ? failure(ex) - : new Status.Failure(ex), sender); - } - else if (tresult.IsCompleted) - recipient.Tell(success != null - ? success(tresult.Result) - : tresult.Result, sender); - }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + var result = await taskToPipe.ConfigureAwait(false); + recipient.Tell(success != null + ? success(result) + : result, sender); + } + catch (Exception ex) + { + recipient.Tell(failure != null + ? failure(ex) + : new Status.Failure(ex), sender); + } } /// @@ -61,25 +57,25 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActor /// TBD /// TBD /// TBD - public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func success = null, Func failure = null) + public static async Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func success = null, Func failure = null) { sender = sender ?? ActorRefs.NoSender; - return taskToPipe.ContinueWith(tresult => - { - if (tresult.IsFaulted) - recipient.Tell(failure != null - ? failure(tresult.Exception) - : new Status.Failure(tresult.Exception), sender); - else if (tresult.IsCanceled) + + try + { + await taskToPipe.ConfigureAwait(false); + + if (success != null) { - var ex = tresult.Exception ?? new AggregateException(new TaskCanceledException()); - recipient.Tell(failure != null - ? failure(ex) - : new Status.Failure(ex), sender); - } - else if (tresult.IsCompleted && success != null) recipient.Tell(success(), sender); - }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + } + } + catch (Exception ex) + { + recipient.Tell(failure != null + ? failure(ex) + : new Status.Failure(ex), sender); + } } } }