arrow-fx-coroutines / arrow.fx.coroutines / Schedule

Schedule

sealed class Schedule<Input, Output>

Retrying and repeating effects

A common demand when working with effects is to retry or repeat them when certain circumstances happen. Usually, the retrial or repetition does not happen right away; rather, it is done based on a policy. For instance, when fetching content from a network request, we may want to retry it when it fails, using an exponential backoff algorithm, for a maximum of 15 seconds or 5 attempts, whatever happens first.

Schedule allows you to define and compose powerful yet simple policies, which can be used to either repeat or retry computation.

Schedule has been derived from Scala ZIO’s Schedule datatype and has been adapted to kotlin.

The two core methods of running a schedule are:

  • retry: The effect is executed once, and if it fails, it will be reattempted based on the scheduling policy passed as an argument. It will stop if the effect ever succeeds, or the policy determines it should not be reattempted again.
  • repeat: The effect is executed once, and if it succeeds, it will be executed again based on the scheduling policy passed as an argument. It will stop if the effect ever fails, or the policy determines it should not be executed again. It will return the last internal state of the scheduling policy, or the error that happened running the effect.

Constructing a policy:

Constructing a simple schedule which recurs 10 times until it succeeds:

import arrow.fx.coroutines.*

fun <A> recurTenTimes() = Schedule.recurs<A>(10)

A more complex schedule

import kotlin.time.seconds
import kotlin.time.milliseconds
import kotlin.time.ExperimentalTime
import arrow.fx.coroutines.*

@ExperimentalTime
fun <A> complexPolicy(): Schedule<A, List<A>> =
  Schedule.exponential<A>(10.milliseconds).whileOutput { it.inNanoseconds < 60.seconds.inNanoseconds }
    .andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
    .zipRight(Schedule.identity<A>().collect())

This policy will recur with exponential backoff as long as the delay is less than 60 seconds and then continue with a spaced delay of 60 seconds. The delay is also randomized slightly to avoid coordinated backoff from multiple services. Finally we also collect every input to the schedule and return it. When used with retry this will return a list of exceptions that occured on failed attempts.

Common use cases

Common use cases Once we have building blocks and ways to combine them, let’s see how we can use them to solve some use cases.

Repeating an effect and dealing with its result

When we repeat an effect, we do it as long as it keeps providing successful results and the scheduling policy tells us to keep recursing. But then, there is a question on what to do with the results provided by each iteration of the repetition.

There are at least 3 possible things we would like to do:

  • Discard all results; i.e., return Unit.
  • Discard all intermediate results and just keep the last produced result.
  • Keep all intermediate results.

Assuming we have a suspend effect in, and we want to repeat it 3 times after its first successful execution, we can do:

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  var counter = 0
  //sampleStart
  val res = Schedule.recurs<Unit>(3).repeat {
    println("Run: ${counter++}")
  }
  //sampleEnd
  println(res)
}

However, when running this new effect, its output will be the number of iterations it has performed, as stated in the documentation of the function. Also notice that we did not handle the error case, there are overloads repeatOrElse and repeatOrElseEither which offer that capability, repeat will just rethrow any error encountered.

If we want to discard the values provided by the repetition of the effect, we can combine our policy with Schedule.unit, using the zipLeft or zipRight combinators, which will keep just the output of one of the policies:

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  var counter = 0
  //sampleStart
  val res = (Schedule.unit<Unit>() zipLeft Schedule.recurs(3)).repeat {
    println("Run: ${counter++}")
  }
  // equal to
  val res2 = (Schedule.recurs<Unit>(3) zipRight Schedule.unit()).repeat {
    println("Run: ${counter++}")
  }
  //sampleEnd
  println(res)
  println(res2)
}

Following the same strategy, we can zip it with the Schedule.identity policy to keep only the last provided result by the effect.

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  var counter = 0
  //sampleStart
  val res = (Schedule.identity<Int>() zipLeft Schedule.recurs(3)).repeat {
    println("Run: ${counter++}"); counter
  }
  // equal to
  val res2 = (Schedule.recurs<Int>(3) zipRight Schedule.identity<Int>()).repeat {
    println("Run: ${counter++}"); counter
  }
  //sampleEnd
  println(res)
  println(res2)
}

Finally, if we want to keep all intermediate results, we can zip the policy with Schedule.collect:

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  var counter = 0
  //sampleStart
  val res = (Schedule.collect<Int>() zipLeft Schedule.recurs(3)).repeat {
    println("Run: ${counter++}")
    counter
  }
  // equal to
  val res2 = (Schedule.recurs<Int>(3) zipRight Schedule.collect<Int>()).repeat {
    println("Run: ${counter++}")
    counter
  }
  //sampleEnd
  println(res)
  println(res2)
}

Repeating an effect until/while it produces a certain value

We can make use of the policies doWhile or doUntil to repeat an effect while or until its produced result matches a given predicate.

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  var counter = 0
  //sampleStart
  val res = Schedule.doWhile<Int>{ it <= 3 }.repeat {
    println("Run: ${counter++}"); counter
  }
  //sampleEnd
  println(res)
}

Exponential backoff retries

A common algorithm to retry effectful operations, as network requests, is the exponential backoff algorithm. There is a scheduling policy that implements this algorithm and can be used as:

import kotlin.time.milliseconds
import kotlin.time.ExperimentalTime
import arrow.fx.coroutines.*

@ExperimentalTime
val exponential = Schedule.exponential<Unit>(250.milliseconds)

Types

Decision A single decision. Contains the decision to continue, the delay, the new state and the (lazy) result of a Schedule.data class Decision<out A, out B>

Functions

and Combines two schedules. Continues only when both continue and chooses the maximum delay.infix fun <A : Input, B> and(other: Schedule<A, B>): Schedule<A, Pair<Output, B>>
andThen Executes one schedule after the other. When the first schedule ends, it continues with the second.abstract infix fun <A : Input, B> andThen(other: Schedule<A, B>): Schedule<A, Either<Output, B>>
check Conditionally checks on both the input and the output whether or not to continue.abstract fun <A : Input> check(pred: suspend (input: A, output: Output) -> Boolean): Schedule<A, Output>
choose Combines two schedules with different input and output and conditionally choose between the two. Continues when the chosen schedule continues and uses the chosen schedules delay.abstract infix fun <A, B> choose(other: Schedule<A, B>): Schedule<Either<Input, A>, Either<Output, B>>
collect Accumulates the results of every execution into a list.fun collect(): Schedule<Input, List<Output>>
combine Combines with another schedule by combining the result and the delay of the Decision with the zipContinue, zipDuration and a zip functionsfun <A : Input, B, C> combine(other: Schedule<A, B>, zipContinue: (cont: Boolean, otherCont: Boolean) -> Boolean, zipDuration: (duration: Duration, otherDuration: Duration) -> Duration, zip: (Output, B) -> C): Schedule<A, C>
combineNanos Combines with another schedule by combining the result and the delay of the Decision with the functions zipContinue, zipDuration and a zip functionabstract fun <A : Input, B, C> combineNanos(other: Schedule<A, B>, zipContinue: (cont: Boolean, otherCont: Boolean) -> Boolean, zipDuration: (duration: Double, otherDuration: Double) -> Double, zip: (Output, B) -> C): Schedule<A, C>
combineWith fun <A : Input, B> ~~combineWith~~(other: Schedule<A, B>, f: (Boolean, Boolean) -> Boolean, g: (Duration, Duration) -> Duration): Schedule<A, Pair<Output, B>>
compose Infix variant of pipe with reversed order.infix fun <B> compose(other: Schedule<B, Input>): Schedule<B, Output>
const Changes the result of a Schedule to always be b.fun <B> const(b: B): Schedule<Input, B>
contramap Changes the input of the schedule. May alter a schedule’s decision if it is based on input.abstract fun <B> contramap(f: suspend (B) -> Input): Schedule<B, Output>
delay fun delay(f: suspend (duration: Duration) -> Duration): Schedule<Input, Output>
delayed Adjusts the delay of a schedule’s Decision.fun ~~delayed~~(f: suspend (duration: Duration) -> Duration): Schedule<Input, Output>
delayedNanos fun delayedNanos(f: suspend (duration: Double) -> Double): Schedule<Input, Output>
dimap fun <B, C> dimap(f: suspend (B) -> Input, g: (Output) -> C): Schedule<B, C>
fold Non-effectful version of foldM.fun <C> fold(initial: C, f: suspend (acc: C, output: Output) -> C): Schedule<Input, C>
foldLazy Accumulates the results of a schedule by folding over them effectfully.abstract fun <C> foldLazy(initial: suspend () -> C, f: suspend (acc: C, output: Output) -> C): Schedule<Input, C>
foldM fun <C> ~~foldM~~(initial: suspend () -> C, f: suspend (acc: C, output: Output) -> C): Schedule<Input, C>
forever Always retries a schedule regardless of the decision made prior to invoking this method.abstract fun forever(): Schedule<Input, Output>
jittered fun jittered(genRand: suspend () -> Double): Schedule<Input, Output>
fun jittered(genRand: suspend () -> Duration): Schedule<Input, Output>
fun ~~jittered~~(): Schedule<Input, Output>
Add random jitter to a schedule.fun jittered(random: Random = Random.Default): Schedule<Input, Output>
logInput Runs an effectful handler on every input. Does not alter the decision.abstract fun logInput(f: suspend (input: Input) -> Unit): Schedule<Input, Output>
logOutput Runs an effectful handler on every output. Does not alter the decision.abstract fun logOutput(f: suspend (output: Output) -> Unit): Schedule<Input, Output>
map Changes the output of a schedule. Does not alter the decision of the schedule.abstract fun <B> map(f: (output: Output) -> B): Schedule<Input, B>
modify Changes the delay of a resulting Decision based on the Output and the produced delay.fun modify(f: suspend (Output, Duration) -> Duration): Schedule<Input, Output>
modifyDelay fun ~~modifyDelay~~(f: suspend (Output, Duration) -> Duration): Schedule<Input, Output>
modifyNanos abstract fun modifyNanos(f: suspend (Output, Double) -> Double): Schedule<Input, Output>
not Inverts the decision of a schedule.abstract operator fun not(): Schedule<Input, Output>
or Combines two schedules. Continues if one continues and chooses the minimum delay.infix fun <A : Input, B> or(other: Schedule<A, B>): Schedule<A, Pair<Output, B>>
pipe Composes this schedule with the other schedule by piping the output of this schedule into the input of the other.abstract infix fun <B> pipe(other: Schedule<Output, B>): Schedule<Input, B>
repeat Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. Returns the last output from the policy or raises an error if a repeat failed.suspend fun repeat(fa: suspend () -> Input): Output
repeatOrElse Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. Also offers a function to handle errors if they are encountered during repetition.suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output
repeatOrElseEither abstract suspend fun <C> repeatOrElseEither(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> C): Either<C, Output>
tupled infix fun <A, B> ~~tupled~~(other: Schedule<A, B>): Schedule<Pair<Input, A>, Pair<Output, B>>
unit fun ~~unit~~(): Schedule<Input, Unit>
untilInput untilInput(f) = whileInput(f).not()fun <A : Input> untilInput(f: suspend (A) -> Boolean): Schedule<A, Output>
untilOutput untilOutput(f) = whileOutput(f).not()fun untilOutput(f: suspend (Output) -> Boolean): Schedule<Input, Output>
void fun void(): Schedule<Input, Unit>
whileInput Continues or stops the schedule based on the input.fun <A : Input> whileInput(f: suspend (A) -> Boolean): Schedule<A, Output>
whileOutput Continues or stops the schedule based on the output.fun whileOutput(f: suspend (Output) -> Boolean): Schedule<Input, Output>
zip Combines two with different input and output using and. Continues when both continue and uses the maximum delay.infix fun <A, B> zip(other: Schedule<A, B>): Schedule<Pair<Input, A>, Pair<Output, B>>
abstract fun <A, B, C> zip(other: Schedule<A, B>, f: (Output, B) -> C): Schedule<Pair<Input, A>, C>
zipLeft Combines two schedules with and but throws away the right schedule’s result.infix fun <A : Input, B> zipLeft(other: Schedule<A, B>): Schedule<A, Output>
zipRight Combines two schedules with and but throws away the left schedule’s result.infix fun <A : Input, B> zipRight(other: Schedule<A, B>): Schedule<A, B>

Companion Object Functions

collect Creates a Schedule which collects all its inputs in a list.fun <A> collect(): Schedule<A, List<A>>
decision Creates a Schedule that returns its decisions.fun <A> decision(): Schedule<A, Boolean>
delay Creates a Schedule that returns its delay.fun <A> ~~delay~~(): Schedule<A, Duration>
delayed fun <A> ~~delayed~~(delaySchedule: Schedule<A, Duration>): Schedule<A, Duration>
Creates a Schedule that uses another Schedule to generate the delay of this schedule. Continues for as long as delaySchedule continues and adds the output of delaySchedule to the delay that delaySchedule produced. Also returns the full delay as output.fun <A> delayed(delaySchedule: Schedule<A, Double>): Schedule<A, Double>
fun <A> delayed(delaySchedule: Schedule<A, Duration>): Schedule<A, Duration>
delayInNanos fun <A> delayInNanos(): Schedule<A, Double>
doUntil Creates a Schedule that continues until f returns true.fun <A> doUntil(f: suspend (A) -> Boolean): Schedule<A, A>
doWhile Creates a Schedule that continues as long as f returns true.fun <A> doWhile(f: suspend (A) -> Boolean): Schedule<A, A>
duration fun <A> duration(): Schedule<A, Duration>
exponential Creates a Schedule that increases its delay exponentially with a given factor and base. Delays can be calculated as base * factor ^ n where n is the number of executions.fun <A> ~~exponential~~(base: Duration, factor: Double = 2.0): Schedule<A, Duration>
fun <A> exponential(base: Double, factor: Double = 2.0): Schedule<A, Double>
fun <A> exponential(base: Duration, factor: Double = 2.0): Schedule<A, Duration>
fibonacci Creates a Schedule that continues with increasing delay by adding the last two delays.fun <A> ~~fibonacci~~(one: Duration): Schedule<A, Duration>
fun <A> fibonacci(one: Double): Schedule<A, Double>
fun <A> fibonacci(one: Duration): Schedule<A, Duration>
forever Creates a Schedule that continues forever and returns the number of iterations.fun <A> forever(): Schedule<A, Int>
identity Creates a Schedule that continues without delay and just returns its input.fun <A> identity(): Schedule<A, A>
invoke Invoke constructor to manually define a schedule. If you need this, please consider adding it to Arrow or suggest a change to avoid using this manual method.operator fun <S, A, B> invoke(initial: suspend () -> S, update: suspend (input: A, state: S) -> Decision<S, B>): Schedule<A, B>
linear Creates a Schedule which increases its delay linearly, by n * base where n is the number of executions.fun <A> ~~linear~~(base: Duration): Schedule<A, Duration>
Creates a Schedule which increases its delay linearly, by n * base where n is the number of executions.fun <A> linear(base: Double): Schedule<A, Double>
fun <A> linear(base: Duration): Schedule<A, Duration>
logInput Creates a Schedule with an effectful handler on the input.fun <A> logInput(f: suspend (A) -> Unit): Schedule<A, A>
logOutput Creates a Schedule with an effectful handler on the output.fun <A> logOutput(f: suspend (A) -> Unit): Schedule<A, A>
never Creates a schedule that never retries.fun <A> never(): Schedule<A, Nothing>
once Creates a Schedule that only retries once.fun <A> once(): Schedule<A, Unit>
recurs Creates a Schedule that continues n times and returns the number of iterations.fun <A> recurs(n: Int): Schedule<A, Int>
spaced Creates a Schedule that continues with a fixed delay.fun <A> ~~spaced~~(interval: Duration): Schedule<A, Int>
fun <A> spaced(interval: Double): Schedule<A, Int>
fun <A> spaced(interval: Duration): Schedule<A, Int>
unfold Non-effectful variant of unfoldLazyfun <I, A> unfold(c: A, f: (A) -> A): Schedule<I, A>
unfoldLazy Creates a schedule that unfolds effectfully using a seed value c and a unfold function f. This keeps the current state (the current seed) as State and runs the unfold function on every call to update. This schedule always continues without delay and returns the current state.fun <I, A> unfoldLazy(c: suspend () -> A, f: suspend (A) -> A): Schedule<I, A>
unfoldM fun <I, A> ~~unfoldM~~(c: suspend () -> A, f: suspend (A) -> A): Schedule<I, A>
unit Creates a Schedule that continues without delay and always returns Unit.fun <A> unit(): Schedule<A, Unit>

Do you like Arrow?

Arrow Org
<