CircuitBreaker

A CircuitBreaker is used to protect resources or services from being overloaded When a service is being overloaded, interacting with it more will only worsen its overloaded state. Especially when combined with retry mechanisms such as Schedule, in some cases simply using a back-off retry policy might not be sufficient during peak traffics.

To allow such overloaded resources from overloading, CircuitBreaker can help you protect the service by failing-fast. Thus CircuitBreaker helps us to achieve stability and prevent cascading failures in distributed systems.

CircuitBreaker has three CircuitBreaker.State:

  1. Closed: This is its normal state, where requests are being made. The state in which CircuitBreaker starts.

    • When an exception occurs it increments the failure counter

    • A successful request will reset the failure counter to zero

    • When the failure counter reaches the maxFailures threshold, the breaker is tripped into the Open state

  2. Open: The CircuitBreaker will short-circuit/fail-fast all requests

    • All requests short-circuit/fail-fast with ExecutionRejected

    • If a request is made after the configured resetTimeout passes, the CircuitBreaker is tripped into the a HalfOpen state, allowing one request to go through as a test.

  3. HalfOpen: The CircuitBreaker is in this state while it's allowing a request to go through, as a test request

Let's say we'd want to create a CircuitBreaker that only allows us to call a remote service twice, and then whenever more than two requests fail with an exception, the circuit breaker starts short-circuiting failing-fast.

import arrow.core.Either
import arrow.fx.coroutines.CircuitBreaker
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay

@ExperimentalTime
suspend fun main(): Unit {
//sampleStart
val circuitBreaker = CircuitBreaker.of(
maxFailures = 2,
resetTimeout = 2.seconds,
exponentialBackoffFactor = 1.2,
maxResetTimeout = 60.seconds,
)
circuitBreaker.protectOrThrow { "I am in Closed: ${circuitBreaker.state()}" }.also(::println)

println("Service getting overloaded . . .")

Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
Either.catch { circuitBreaker.protectOrThrow { throw RuntimeException("Service overloaded") } }.also(::println)
circuitBreaker.protectEither { }.also { println("I am Open and short-circuit with ${it}. ${circuitBreaker.state()}") }

println("Service recovering . . .").also { delay(2000) }

circuitBreaker.protectOrThrow { "I am running test-request in HalfOpen: ${circuitBreaker.state()}" }.also(::println)
println("I am back to normal state closed ${circuitBreaker.state()}")
//sampleEnd
}

A common pattern to make fault-tolerant/resilient systems is to compose a CircuitBreaker with a backing-off policy retry Schedule to guarantee not overloading the resource and the client interacting with it. but also not the client that is interacting with the resource. Below you can see how the simple retry function will result in Either.Left<CircuitBreaker.RejectedExecution>, but when we combine it with another schedule, it will always call the CircuitBreaker on times that it could've entered the HalfOpen state. The reason why Schedule is not sufficient to make your system resilient is because you also have to take into account parallel calls to your functions, ; In contrast, a CircuitBreaker can track failures of every function call or even different functions to the same resource or service.

import arrow.core.Either
import arrow.fx.coroutines.CircuitBreaker
import arrow.fx.coroutines.Schedule
import arrow.fx.coroutines.retry
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay

@ExperimentalTime
suspend fun main(): Unit {
suspend fun apiCall(): Unit {
println("apiCall . . .")
throw RuntimeException("Overloaded service")
}

//sampleStart
val circuitBreaker = CircuitBreaker.of(
maxFailures = 2,
resetTimeout = 2.seconds,
exponentialBackoffFactor = 2.0, // enable exponentialBackoffFactor
maxResetTimeout = 60.seconds, // limit exponential back-off time
)

suspend fun <A> resilient(schedule: Schedule<Throwable, *>, f: suspend () -> A): A =
schedule.retry { circuitBreaker.protectOrThrow(f) }

Either.catch {
resilient(Schedule.recurs(5), ::apiCall)
}.let { println("recurs(5) apiCall twice and 4x short-circuit result from CircuitBreaker: $it") }

delay(2000)
println("CircuitBreaker ready to half-open")

// Retry once and when the CircuitBreaker opens after 2 failures then retry with exponential back-off with same time as CircuitBreaker's resetTimeout
val fiveTimesWithBackOff = Schedule.recurs<Throwable>(1) andThen
Schedule.exponential(2.seconds) and Schedule.recurs(5)

Either.catch {
resilient(fiveTimesWithBackOff, ::apiCall)
}.let { println("exponential(2.seconds) and recurs(5) always retries with actual apiCall: $it") }
//sampleEnd
}

Types

Link copied to clipboard
object Companion
Link copied to clipboard
class ExecutionRejected(val reason: String, val state: CircuitBreaker.State) : Throwable
Link copied to clipboard
sealed class State

The initial state when initializing a CircuitBreaker is Closed.

Functions

Link copied to clipboard
suspend fun awaitClose()

Awaits for this CircuitBreaker to be CircuitBreaker.State.Closed.

Link copied to clipboard
fun doOnClosed(callback: suspend () -> Unit): CircuitBreaker

Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.Closed state.

Link copied to clipboard
fun doOnHalfOpen(callback: suspend () -> Unit): CircuitBreaker

Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.HalfOpen state.

Link copied to clipboard
fun doOnOpen(callback: suspend () -> Unit): CircuitBreaker

Returns a new circuit breaker that wraps the state of the source and that will fire the given callback upon the circuit breaker transitioning to the CircuitBreaker.Open state.

Link copied to clipboard
fun doOnRejectedTask(callback: suspend () -> Unit): CircuitBreaker

Returns a new circuit breaker that wraps the state of the source and that upon a task being rejected will execute the given callback.

Link copied to clipboard
suspend fun <A> protectEither(fa: suspend () -> A): Either<CircuitBreaker.ExecutionRejected, A>

Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker. If an exception in fa occurs, other than an ExecutionRejected exception, it will be rethrown.

Link copied to clipboard
suspend tailrec fun <A> protectOrThrow(fa: suspend () -> A): A

Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker. If an exception in fa occurs it will be rethrown

Link copied to clipboard

Returns the current CircuitBreaker.State, meant for debugging purposes.