Skip to content

Commit

Permalink
rewrite PipeTo to use async / await instead of a ContinueWith (
Browse files Browse the repository at this point in the history
…#5361)

* rewrite `PipeTo` to use `async` / `await` instead of a callback

Doing this for two reasons:

1. Allows the `AsyncLocal` context to flow into the `PipeTo` and its resultant `IActorRef.Tell` call, which is helpful for telemetry purposes
2. Eliminates a `ContinueWith` delegate everywhere it's used.

* fixed `QueueSinkSpec`

* fixed `QueueSourceSpec`

* fixed `PipeToSupportSpec`

* fixed racy `PipeToSupportSpec`
  • Loading branch information
Aaronontheweb authored Nov 15, 2021
1 parent 7455752 commit e27220a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void QueueSink_should_fail_future_on_stream_failure()

sub.SendError(TestException());
ExpectMsg<Status.Failure>(
f => f.Cause is AggregateException && f.Cause.InnerException.Equals(TestException()));
f => f.Cause.Equals(TestException()));
}, _materializer);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpr

queue.OfferAsync(6).PipeTo(TestActor);
queue.OfferAsync(7).PipeTo(TestActor);
ExpectMsg<Status.Failure>().Cause.InnerException.Should().BeOfType<IllegalStateException>();
ExpectMsg<Status.Failure>().Cause.Should().BeOfType<IllegalStateException>();
probe.RequestNext(1);
ExpectMsg(Enqueued.Instance);
queue.Complete();
Expand Down
23 changes: 17 additions & 6 deletions src/core/Akka.Tests/Actor/PipeToSupportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.Tests.Actor
Expand All @@ -27,6 +29,14 @@ public PipeToSupportSpec()
_taskWithoutResult = _taskCompletionSource.Task;
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
}

[Fact]
public void Should_immediately_PipeTo_completed_Task()
{
var task = Task.FromResult("foo");
task.PipeTo(TestActor);
ExpectMsg("foo");
}

[Fact]
public void Should_by_default_send_task_result_as_message()
Expand All @@ -50,8 +60,8 @@ public void Should_by_default_send_task_exception_as_status_failure_message()
_task.PipeTo(TestActor);
_taskWithoutResult.PipeTo(TestActor);
_taskCompletionSource.SetException(new Exception("Boom"));
ExpectMsg<Status.Failure>(x => x.Cause.InnerException.Message == "Boom");
ExpectMsg<Status.Failure>(x => x.Cause.InnerException.Message == "Boom");
ExpectMsg<Status.Failure>(x => x.Cause.Message == "Boom");
ExpectMsg<Status.Failure>(x => x.Cause.Message == "Boom");
}

[Fact]
Expand All @@ -60,15 +70,16 @@ public void Should_use_success_handling_to_transform_task_result()
_task.PipeTo(TestActor, success: x => "Hello " + x);
_taskWithoutResult.PipeTo(TestActor, success: () => "Hello");
_taskCompletionSource.SetResult("World");
ExpectMsg("Hello World");
ExpectMsg("Hello");
var pipeTo = ReceiveN(2).Cast<string>().ToList();
pipeTo.Should().Contain("Hello");
pipeTo.Should().Contain("Hello World");
}

[Fact]
public void Should_use_failure_handling_to_transform_task_exception()
{
_task.PipeTo(TestActor, failure: e => "Such a " + e.InnerException.Message);
_taskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.InnerException.Message);
_task.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_taskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_taskCompletionSource.SetException(new Exception("failure..."));
ExpectMsg("Such a failure...");
ExpectMsg("Such a failure...");
Expand Down
64 changes: 30 additions & 34 deletions src/core/Akka/Actor/PipeToSupport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,24 @@ public static class PipeToSupport
/// <param name="sender">TBD</param>
/// <param name="success">TBD</param>
/// <param name="failure">TBD</param>
/// <returns>TBD</returns>
public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActorRef sender = null, Func<T, object> success = null, Func<Exception, object> failure = null)
/// <returns>A detached task</returns>
public static async Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActorRef sender = null, Func<T, object> success = null, Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;
return taskToPipe.ContinueWith(tresult =>

try
{
if (tresult.IsFaulted)
recipient.Tell(failure != null
? failure(tresult.Exception)
: new Status.Failure(tresult.Exception), sender);
else if (tresult.IsCanceled)
{
var ex = tresult.Exception ?? new AggregateException(new TaskCanceledException());
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
else if (tresult.IsCompleted)
recipient.Tell(success != null
? success(tresult.Result)
: tresult.Result, sender);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
var result = await taskToPipe.ConfigureAwait(false);
recipient.Tell(success != null
? success(result)
: result, sender);
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}

/// <summary>
Expand All @@ -61,25 +57,25 @@ public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActor
/// <param name="success">TBD</param>
/// <param name="failure">TBD</param>
/// <returns>TBD</returns>
public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func<object> success = null, Func<Exception, object> failure = null)
public static async Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef sender = null, Func<object> success = null, Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;
return taskToPipe.ContinueWith(tresult =>
{
if (tresult.IsFaulted)
recipient.Tell(failure != null
? failure(tresult.Exception)
: new Status.Failure(tresult.Exception), sender);
else if (tresult.IsCanceled)

try
{
await taskToPipe.ConfigureAwait(false);

if (success != null)
{
var ex = tresult.Exception ?? new AggregateException(new TaskCanceledException());
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
else if (tresult.IsCompleted && success != null)
recipient.Tell(success(), sender);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}
}
}
Expand Down

0 comments on commit e27220a

Please sign in to comment.