Schedule
Retrying and repeating effects
A common demand when working with effects is to retry or repeat them when certain circumstances happen. Usually, the retrial or repetition does not happen right away; rather, it is done based on a policy. For instance, when fetching content from a network request, we may want to retry it when it fails, using an exponential backoff algorithm, for a maximum of 15 seconds or 5 attempts, whatever happens first.
Schedule allows you to define and compose powerful yet simple policies, which can be used to either repeat or retry computation.
The two core methods of running a schedule are:
retry: The effect is executed once, and if it fails, it will be reattempted based on the scheduling policy passed as an argument. It will stop if the effect ever succeeds, or the policy determines it should not be reattempted again.
repeat: The effect is executed once, and if it succeeds, it will be executed again based on the scheduling policy passed as an argument. It will stop if the effect ever fails, or the policy determines it should not be executed again. It will return the last internal state of the scheduling policy, or the error that happened running the effect.
Constructing a policy:
Constructing a simple schedule which recurs 10 times until it succeeds:
import arrow.fx.coroutines.*
fun <A> recurTenTimes() = Schedule.recurs<A>(10)
A more complex schedule
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import arrow.fx.coroutines.*
@ExperimentalTime
fun <A> complexPolicy(): Schedule<A, List<A>> =
Schedule.exponential<A>(10.milliseconds).whileOutput { it < 60.seconds }
.andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
.zipRight(Schedule.identity<A>().collect())
This policy will recur with exponential backoff as long as the delay is less than 60 seconds and then continue with a spaced delay of 60 seconds. The delay is also randomized slightly to avoid coordinated backoff from multiple services. Finally we also collect every input to the schedule and return it. When used with retry this will return a list of exceptions that occured on failed attempts.
Common use cases
Common use cases Once we have building blocks and ways to combine them, let’s see how we can use them to solve some use cases.
Repeating an effect and dealing with its result
When we repeat an effect, we do it as long as it keeps providing successful results and the scheduling policy tells us to keep recursing. But then, there is a question on what to do with the results provided by each iteration of the repetition.
There are at least 3 possible things we would like to do:
Discard all results; i.e., return
Unit
.Discard all intermediate results and just keep the last produced result.
Keep all intermediate results.
Assuming we have a suspend effect in, and we want to repeat it 3 times after its first successful execution, we can do:
import arrow.fx.coroutines.*
suspend fun main(): Unit {
var counter = 0
//sampleStart
val res = Schedule.recurs<Unit>(3).repeat {
println("Run: ${counter++}")
}
//sampleEnd
println(res)
}
However, when running this new effect, its output will be the number of iterations it has performed, as stated in the documentation of the function. Also notice that we did not handle the error case, there are overloads repeatOrElse and repeatOrElseEither which offer that capability, repeat will just rethrow any error encountered.
If we want to discard the values provided by the repetition of the effect, we can combine our policy with Schedule.unit, using the zipLeft or zipRight combinators, which will keep just the output of one of the policies:
import arrow.fx.coroutines.*
suspend fun main(): Unit {
var counter = 0
//sampleStart
val res = (Schedule.unit<Unit>() zipLeft Schedule.recurs(3)).repeat {
println("Run: ${counter++}")
}
// equal to
val res2 = (Schedule.recurs<Unit>(3) zipRight Schedule.unit()).repeat {
println("Run: ${counter++}")
}
//sampleEnd
println(res)
println(res2)
}
Following the same strategy, we can zip it with the Schedule.identity policy to keep only the last provided result by the effect.
import arrow.fx.coroutines.*
suspend fun main(): Unit {
var counter = 0
//sampleStart
val res = (Schedule.identity<Int>() zipLeft Schedule.recurs(3)).repeat {
println("Run: ${counter++}"); counter
}
// equal to
val res2 = (Schedule.recurs<Int>(3) zipRight Schedule.identity<Int>()).repeat {
println("Run: ${counter++}"); counter
}
//sampleEnd
println(res)
println(res2)
}
Finally, if we want to keep all intermediate results, we can zip the policy with Schedule.collect:
import arrow.fx.coroutines.*
suspend fun main(): Unit {
var counter = 0
//sampleStart
val res = (Schedule.collect<Int>() zipLeft Schedule.recurs(3)).repeat {
println("Run: ${counter++}")
counter
}
// equal to
val res2 = (Schedule.recurs<Int>(3) zipRight Schedule.collect<Int>()).repeat {
println("Run: ${counter++}")
counter
}
//sampleEnd
println(res)
println(res2)
}
Repeating an effect until/while it produces a certain value
We can make use of the policies doWhile or doUntil to repeat an effect while or until its produced result matches a given predicate.
import arrow.fx.coroutines.*
suspend fun main(): Unit {
var counter = 0
//sampleStart
val res = Schedule.doWhile<Int>{ it <= 3 }.repeat {
println("Run: ${counter++}"); counter
}
//sampleEnd
println(res)
}
Exponential backoff retries
A common algorithm to retry effectful operations, as network requests, is the exponential backoff algorithm. There is a scheduling policy that implements this algorithm and can be used as:
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime
import arrow.fx.coroutines.*
@ExperimentalTime
val exponential = Schedule.exponential<Unit>(250.milliseconds)
Types
Functions
Combines with another schedule by combining the result and the delay of the Decision with the zipContinue, zipDuration and a zip functions
Combines with another schedule by combining the result and the delay of the Decision with the functions zipContinue, zipDuration and a zip function
Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. This will raise an error if a repeat failed.
Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. Also offers a function to handle errors if they are encountered during repetition.
Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. Also offers a function to handle errors if they are encountered during repetition.
Extensions
Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay. Returns the result of the effect if if it was successful or re-raises the last error encountered when the schedule ends.
Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay. Also offers a function to handle errors if they are encountered during retrial.
Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay. Also offers a function to handle errors if they are encountered during retrial.