Skip to content

Commit

Permalink
Merge pull request #258 from peterneyens/uncancelable-half-open
Browse files Browse the repository at this point in the history
Make half open cancelation configurable
  • Loading branch information
ChristopherDavenport authored Apr 12, 2023
2 parents 5602061 + a91148d commit 210416a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ object CircuitBreaker {
backoff = Backoff.exponential,
maxResetTimeout = 1.minute,
exceptionFilter = Function.const(true),
cancelableHalfOpen = true,
onRejected = F.unit,
onClosed = F.unit,
onHalfOpen = F.unit,
Expand All @@ -426,6 +427,7 @@ object CircuitBreaker {
private val backoff: FiniteDuration => FiniteDuration,
private val maxResetTimeout: Duration,
private val exceptionFilter: Throwable => Boolean,
private val cancelableHalfOpen: Boolean,
private val onRejected: F[Unit],
private val onClosed: F[Unit],
private val onHalfOpen: F[Unit],
Expand All @@ -439,6 +441,7 @@ object CircuitBreaker {
backoff: FiniteDuration => FiniteDuration = self.backoff,
maxResetTimeout: Duration = self.maxResetTimeout,
exceptionFilter: Throwable => Boolean = self.exceptionFilter,
cancelableHalfOpen: Boolean = self.cancelableHalfOpen,
onRejected: F[Unit] = self.onRejected,
onClosed: F[Unit] = self.onClosed,
onHalfOpen: F[Unit] = self.onHalfOpen,
Expand All @@ -449,6 +452,7 @@ object CircuitBreaker {
resetTimeout = resetTimeout,
backoff = backoff,
maxResetTimeout = maxResetTimeout,
cancelableHalfOpen = cancelableHalfOpen,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand All @@ -464,6 +468,10 @@ object CircuitBreaker {
copy(backoff = backoff)
def withMaxResetTimout(maxResetTimeout: Duration): Builder[F] =
copy(maxResetTimeout = maxResetTimeout)
def withCancelableHalfOpen: Builder[F] =
copy(cancelableHalfOpen = true)
def withUncancelableHalfOpen: Builder[F] =
copy(cancelableHalfOpen = false)
def withOnRejected(onRejected: F[Unit]): Builder[F] =
copy(onRejected = onRejected)
def withOnClosed(onClosed: F[Unit]): Builder[F] =
Expand Down Expand Up @@ -491,6 +499,7 @@ object CircuitBreaker {
backoff,
maxResetTimeout,
exceptionFilter,
cancelableHalfOpen,
onRejected,
onClosed,
onHalfOpen,
Expand All @@ -507,6 +516,7 @@ object CircuitBreaker {
backoff,
maxResetTimeout,
exceptionFilter,
cancelableHalfOpen,
onRejected,
onClosed,
onHalfOpen,
Expand All @@ -522,6 +532,7 @@ object CircuitBreaker {
backoff,
maxResetTimeout,
exceptionFilter,
cancelableHalfOpen,
onRejected,
onClosed,
onHalfOpen,
Expand Down Expand Up @@ -627,6 +638,7 @@ object CircuitBreaker {
backoff: FiniteDuration => FiniteDuration,
maxResetTimeout: Duration,
exceptionFilter: Throwable => Boolean,
cancelableHalfOpen: Boolean,
onRejected: F[Unit],
onClosed: F[Unit],
onHalfOpen: F[Unit],
Expand All @@ -651,6 +663,7 @@ object CircuitBreaker {
backoff = backoff,
maxResetTimeout = maxResetTimeout,
exceptionFilter = exceptionFilter,
cancelableHalfOpen = cancelableHalfOpen,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand All @@ -666,6 +679,7 @@ object CircuitBreaker {
backoff = backoff,
maxResetTimeout = maxResetTimeout,
exceptionFilter = exceptionFilter,
cancelableHalfOpen = cancelableHalfOpen,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand All @@ -681,6 +695,7 @@ object CircuitBreaker {
backoff = backoff,
maxResetTimeout = maxResetTimeout,
exceptionFilter = exceptionFilter,
cancelableHalfOpen = cancelableHalfOpen,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand All @@ -697,6 +712,7 @@ object CircuitBreaker {
backoff = backoff,
maxResetTimeout = maxResetTimeout,
exceptionFilter = exceptionFilter,
cancelableHalfOpen = cancelableHalfOpen,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand Down Expand Up @@ -749,7 +765,7 @@ object CircuitBreaker {
// the Circuit Breaker is HalfOpen and all new requests are
// failed automatically.
def resetOnSuccess(poll: Poll[F]): F[A] = {
poll(fa).guaranteeCase {
(if (cancelableHalfOpen) poll(fa) else fa).guaranteeCase {
case Outcome.Succeeded(_) => ref.set(ClosedZero) >> onClosed.attempt.void
case Outcome.Errored(e) =>
if (exceptionFilter(e)) ref.set(nextBackoff(open, now)) >> onOpen.attempt.void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,32 @@ class CircuitBreakerTests extends CatsEffectSuite {
}
} yield ()
}

test("Validate withUncancelableHalfOpen") {
closesAfterHalfOpenCancelation(cancelable = false).assertEquals(true)
}

test("Validate withCancelableHalfOpen") {
closesAfterHalfOpenCancelation(cancelable = true).assertEquals(false)
}

private def closesAfterHalfOpenCancelation(cancelable: Boolean): IO[Boolean] = {
val configure: CircuitBreaker.Builder[IO] => CircuitBreaker.Builder[IO] =
if (cancelable) _.withCancelableHalfOpen else _.withUncancelableHalfOpen
val reset = 10.millis
for {
closed <- Ref[IO].of(false)
cb <- configure(
CircuitBreaker
.default[IO](maxFailures = 1, resetTimeout = reset)
.withOnClosed(closed.set(true))
).build
_ <- cb.protect(IO.raiseError[Unit](new RuntimeException("boom"))).attempt
_ <- IO.sleep(reset * 5) // wait until no longer fast failing
slowButSucceeds = IO.sleep(50.millis)
_ <- cb.protect(slowButSucceeds).timeout(1.millis).attempt
// check if circuit breaker closed after `slowButSucceeds` should be finished
didClose <- IO.sleep(75.millis) >> closed.get
} yield didClose
}
}

0 comments on commit 210416a

Please sign in to comment.