Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: swallowed exceptions in timed out WithCircuitBreaker() #7362

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
156 changes: 95 additions & 61 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
using static FluentAssertions.FluentActions;

namespace Akka.Tests.Pattern
{
Expand Down Expand Up @@ -199,65 +200,86 @@ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
public async Task Must_allow_calls_through()
{
var breaker = LongCallTimeoutCb();
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
Assert.Equal("hi", result);
var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout);
result.Should().Be("hi");
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on exception")]
public async Task Must_increment_failure_count_on_exception()
{
var breaker = LongCallTimeoutCb();
await InterceptException<TestException>(() =>
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.CurrentFailureCount.ShouldBe(1);
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));

CheckLatch(breaker.OpenLatch).Should().BeTrue();
breaker.Instance.CurrentFailureCount.Should().Be(1);
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on async failure")]
public void Must_increment_failure_count_on_async_failure()
public async Task Must_increment_failure_count_on_async_failure()
{
var breaker = LongCallTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.CurrentFailureCount.ShouldBe(1);
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));

CheckLatch(breaker.OpenLatch).Should().BeTrue();
breaker.Instance.CurrentFailureCount.Should().Be(1);
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must reset failure count after success")]
public async Task Must_reset_failure_count_after_success()
{
var breaker = MultiFailureCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi));
Enumerable.Range(1, 4).ForEach(_ => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)));
await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(4), AwaitTimeout);
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi));
await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(0), AwaitTimeout);
_ = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout);

await Task.WhenAll(Enumerable.Range(1, 4)
.Select(_
=> InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync))));

breaker.Instance.CurrentFailureCount.Should().Be(4);

var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout);
result.Should().Be("hi");
breaker.Instance.CurrentFailureCount.ShouldBe(0);
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on callTimeout")]
public async Task Must_increment_failure_count_on_callTimeout()
{
var breaker = ShortCallTimeoutCb();

var future = breaker.Instance.WithCircuitBreaker(async () =>
{
await Task.Delay(150);
ThrowException();
});
var innerFuture = SlowThrowing();
var future = breaker.Instance.WithCircuitBreaker(() => innerFuture);

Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.CurrentFailureCount.ShouldBe(1);
CheckLatch(breaker.OpenLatch).Should().BeTrue();
breaker.Instance.CurrentFailureCount.Should().Be(1);

// Since the timeout should have happened before the inner code finishes
// we expect a timeout, not TestException
await InterceptException<TimeoutException>(() => future.WaitAsync(AwaitTimeout));
await InterceptException<TimeoutException>(() => future);

// Issue https://github.com/akkadotnet/akka.net/issues/7358
// The actual exception is thrown out-of-band with no handler because inner Task is detached
// after a timeout and NOT protected

// In the bug, the task is still running when it should've been stopped.
innerFuture.IsCompleted.Should().BeTrue();
innerFuture.IsFaulted.Should().BeTrue();
innerFuture.Exception.Should().BeOfType<TestException>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual test, the inner Task should have stopped when the circuit breaker timed out, but it is still running as a detached Task.


return;

async Task SlowThrowing()
{
await Task.Delay(150);
await ThrowExceptionAsync();
}
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")]
public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state()
public async Task Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state()
{
var breaker = ShortCallTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.OpenLatch).Should().BeTrue();
}
}

Expand All @@ -267,35 +289,36 @@ public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase
public async Task Must_pass_through_next_call_and_close_on_success()
{
var breaker = ShortResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
Assert.Equal("hi", result);
Assert.True(CheckLatch(breaker.ClosedLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.HalfOpenLatch).Should().BeTrue();

var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout);
result.Should().Be("hi");
CheckLatch(breaker.ClosedLatch).Should().BeTrue();
}

[Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on exception in call")]
public async Task Must_reopen_on_exception_in_call()
{
var breaker = ShortResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.HalfOpenLatch).Should().BeTrue();

breaker.OpenLatch.Reset();
await InterceptException<TestException>(() =>
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
Assert.True(CheckLatch(breaker.OpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.OpenLatch).Should().BeTrue();
}

[Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on async failure")]
public void Must_reopen_on_async_failure()
public async Task Must_reopen_on_async_failure()
{
var breaker = ShortResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.HalfOpenLatch).Should().BeTrue();

breaker.OpenLatch.Reset();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.OpenLatch).Should().BeTrue();
}
}

Expand All @@ -305,43 +328,40 @@ public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase
public async Task Must_throw_exceptions_when_called_before_reset_timeout()
{
var breaker = LongResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));

Assert.True(CheckLatch(breaker.OpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.OpenLatch).Should().BeTrue();

await InterceptException<OpenCircuitException>(
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout));
await InterceptException<OpenCircuitException>(() => breaker.Instance.WithCircuitBreaker(SayHiAsync));
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")]
public void Must_transition_to_half_open_on_reset_timeout()
public async Task Must_transition_to_half_open_on_reset_timeout()
{
var breaker = ShortResetTimeoutCb();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.HalfOpenLatch).Should().BeTrue();
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open must increase the reset timeout after it transits to open again")]
public async Task Must_increase_reset_timeout_after_it_transits_to_open_again()
{
var breaker = NonOneFactorCb();
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));
CheckLatch(breaker.OpenLatch).Should().BeTrue();

var e1 = await InterceptException<OpenCircuitException>(
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)));
var e1 = await InterceptException<OpenCircuitException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
var shortRemainingDuration = e1.RemainingDuration;

await Task.Delay(Dilated(TimeSpan.FromMilliseconds(1000)));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
CheckLatch(breaker.HalfOpenLatch).Should().BeTrue();

// transit to open again
breaker.OpenLatch.Reset();
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
Assert.True(CheckLatch(breaker.OpenLatch));
await InterceptException<TestException>(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync));
CheckLatch(breaker.OpenLatch).Should().BeTrue();

var e2 = await InterceptException<OpenCircuitException>(() =>
breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayHi())));
var e2 = await InterceptException<OpenCircuitException>(() => breaker.Instance.WithCircuitBreaker(SayHiAsync));
var longRemainingDuration = e2.RemainingDuration;

shortRemainingDuration.ShouldBeLessThan(longRemainingDuration);
Expand All @@ -357,15 +377,29 @@ public class CircuitBreakerSpecBase : AkkaSpec
[DebuggerStepThrough]
public static void ThrowException() => throw new TestException("Test Exception");

[DebuggerStepThrough]
public static async Task ThrowExceptionAsync()
{
await Task.Yield();
throw new TestException("Test Exception");
}

public static string SayHi() => "hi";

protected T InterceptException<T>(Action actionThatThrows) where T : Exception =>
Intercept<T>(actionThatThrows);
public static async Task<string> SayHiAsync()
{
await Task.Yield();
return "hi";
}

protected static T InterceptException<T>(Action actionThatThrows) where T : Exception =>
actionThatThrows.Should().Throw<T>().And;

protected static async Task<T> InterceptException<T>(Func<Task> actionThatThrows)
protected async Task<T> InterceptException<T>(Func<Task> actionThatThrows)
where T : Exception
{
return (await actionThatThrows.Should().ThrowExactlyAsync<T>()).And;
return (await Awaiting(() => actionThatThrows().WaitAsync(AwaitTimeout))
.Should().ThrowExactlyAsync<T>()).And;
}

public TestBreaker ShortCallTimeoutCb() =>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void WithSyncCircuitBreaker(Action body) =>
/// <param name="body">Call needing protected</param>
/// <returns>The result of the call</returns>
public T WithSyncCircuitBreaker<T>(Func<T> body) =>
WithCircuitBreaker(body, b => Task.Run(b)).Result;
WithCircuitBreaker(body, b => Task.Run(b)).GetAwaiter().GetResult();

/// <summary>
/// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the
Expand Down
Loading