arrow-fx / arrow.fx / Schedule
sealed class ~~Schedule~~<F, Input, Output> :
ScheduleOf
<F, Input, Output>
Deprecated: The IO datatype and it’s related type classes will disappear in Arrow 0.13.0. All useful operations are offered directly over suspend functions by Arrow Fx Coroutines. https://arrow-kt.io/docs/fx/async/
A common demand when working with effects is to retry or repeat them when certain circumstances happen. Usually, the retrial or repetition does not happen right away; rather, it is done based on a policy. For instance, when fetching content from a network request, we may want to retry it when it fails, using an exponential backoff algorithm, for a maximum of 15 seconds or 5 attempts, whatever happens first.
Schedule allows you to define and compose powerful yet simple policies, which can be used to either repeat or retry computation.
Schedule has been derived from scalaz zio’s Schedule datatype and has been adapted to kotlin.
The two core methods of running a schedule are:
Because schedules are polymorphic over any F that is also a Monad, constructing a Schedule can sometimes mean having to explicitly write the type-parameters. This can be avoided using Schedule.withMonad which partially applies the chosen Monad.
Constructing a simple schedule which recurs 10 times until it succeeds:
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.monad.monad
fun <A> recurTenTimes() = Schedule.recurs<ForIO, A>(IO.monad(), 10)
A more complex schedule is best put together using the withMonad constructor:
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.monad.monad
import arrow.fx.extensions.io.monadDefer.monadDefer
import arrow.fx.typeclasses.milliseconds
import arrow.fx.typeclasses.seconds
fun <A> complexPolicy() =
Schedule.withMonad(IO.monad()) {
exponential<A>(10.milliseconds).whileOutput { it.nanoseconds < 60.seconds.nanoseconds }
.andThen(spaced<A>(60.seconds) and recurs<A>(100)).jittered(IO.monadDefer())
.zipRight(identity<A>().collect())
}
This policy will recur with exponential backoff as long as the delay is less than 60 seconds and then continue with a spaced delay of 60 seconds. The delay is also randomized slightly to avoid coordinated backoff from multiple services. Finally we also collect every input to the schedule and return it. When used with retry this will return a list of exceptions that occured on failed attempts.
Common use cases Once we have building blocks and ways to combine them, let’s see how we can use them to solve some use cases.
When we repeat an effect, we do it as long as it keeps providing successful results and the scheduling policy tells us to keep recursing. But then, there is a question on what to do with the results provided by each iteration of the repetition.
There are at least 3 possible things we would like to do:
Unit
.Assuming we have an effect in IO, and we want to repeat it 3 times after its first successful execution, we can do:
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.monad.monad
import arrow.fx.fix
import arrow.fx.repeat
fun main() {
var counter = 0
val io = IO { println("Run: ${counter++}") }
//sampleStart
val res = io.repeat(IO.concurrent(), Schedule.recurs(IO.monad(), 3))
//sampleEnd
println(res.fix().unsafeRunSync())
}
However, when running this new effect, its output will be the number of iterations it has performed, as stated in the documentation of the function. Also notice that we did not handle the error case, there are overloads repeatOrElse and repeatOrElseEither which offer that capability, repeat will just rethrow any error encountered.
If we want to discard the values provided by the repetition of the effect, we can combine our policy with Schedule.unit, using the zipLeft or zipRight combinators, which will keep just the output of one of the policies:
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.monad.monad
import arrow.fx.fix
import arrow.fx.repeat
fun main() {
var counter = 0
val io = IO { println("Run: ${counter++}") }
//sampleStart
val res = io.repeat(IO.concurrent(), Schedule.unit<ForIO, Unit>(IO.monad()).zipLeft(Schedule.recurs(IO.monad(), 3)))
// equal to
val res2 = io.repeat(IO.concurrent(), Schedule.recurs<ForIO, Unit>(IO.monad(), 3).zipRight(Schedule.unit(IO.monad())))
//sampleEnd
println(res.fix().unsafeRunSync())
println(res2.fix().unsafeRunSync())
}
Following the same strategy, we can zip it with the Schedule.identity policy to keep only the last provided result by the effect.
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.monad.monad
import arrow.fx.fix
import arrow.fx.repeat
fun main() {
var counter = 0
val io = IO { println("Run: ${counter++}"); counter }
//sampleStart
val res = io.repeat(IO.concurrent(), Schedule.identity<ForIO, Int>(IO.monad()).zipLeft(Schedule.recurs(IO.monad(), 3)))
// equal to
val res2 = io.repeat(IO.concurrent(), Schedule.recurs<ForIO, Int>(IO.monad(), 3).zipRight(Schedule.identity<ForIO, Int>(IO.monad())))
//sampleEnd
println(res.fix().unsafeRunSync())
println(res2.fix().unsafeRunSync())
}
Finally, if we want to keep all intermediate results, we can zip the policy with Schedule.collect:
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.monad.monad
import arrow.fx.fix
import arrow.fx.repeat
fun main() {
var counter = 0
val io = IO { println("Run: ${counter++}"); counter }
//sampleStart
val res = io.repeat(IO.concurrent(), Schedule.collect<ForIO, Int>(IO.monad()).zipLeft(Schedule.recurs(IO.monad(), 3)))
// equal to
val res2 = io.repeat(IO.concurrent(), Schedule.recurs<ForIO, Int>(IO.monad(), 3).zipRight(Schedule.collect<ForIO, Int>(IO.monad())))
//sampleEnd
println(res.fix().unsafeRunSync())
println(res2.fix().unsafeRunSync())
}
We can make use of the policies doWhile or doUntil to repeat an effect while or until its produced result matches a given predicate.
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.monad.monad
import arrow.fx.fix
import arrow.fx.repeat
fun main() {
var counter = 0
val io = IO { println("Run: ${counter++}"); counter }
//sampleStart
val res = io.repeat(IO.concurrent(), Schedule.doWhile<ForIO, Int>(IO.monad()) { it <= 3 })
//sampleEnd
println(res.fix().unsafeRunSync())
}
A common algorithm to retry effectful operations, as network requests, is the exponential backoff algorithm. There is a scheduling policy that implements this algorithm and can be used as:
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Schedule
import arrow.fx.extensions.io.monad.monad
import arrow.fx.typeclasses.milliseconds
val exponential = Schedule.exponential<ForIO, Unit>(IO.monad(), 250.milliseconds)
Decision | A single decision. Contains the decision to continue, the delay, the new state and the (lazy) result of a Schedule.data class ~~Decision~~<out A, out B> : DecisionOf <A, B> |
ScheduleFor | Interface with all above methods partially applied to some monad M for convenience.interface ScheduleFor<M> |
and | Combine two schedules. Continues only when both continue and chooses the maximum delay.infix fun <A : Input, B> and(other: Schedule <F, A, B>): Schedule <F, A, Tuple2<Output, B>> |
andThen | Execute one schedule after the other. When the first schedule ends, it continues with the second.abstract infix fun <A : Input, B> andThen(other: Schedule <F, A, B>): Schedule <F, A, Either<Output, B>> |
check | Conditionally check on both the input and the output whether or not to continue.abstract fun <A : Input> check(pred: (A, Output) -> Kind<F, Boolean >): Schedule <F, A, Output> |
choose | Combine two schedules with different input and output and conditionally choose between the two. Continues when the chosen schedule continues and uses the chosen schedules delay.abstract infix fun <A, B> choose(other: Schedule <F, A, B>): Schedule <F, Either<Input, A>, Either<Output, B>> |
collect | Accumulate the results of every execution to a listfun collect(): Schedule <F, Input, List <Output>> |
combineWith | Combine with another schedule by combining the result and the delay of the Decision with the functions f and gabstract fun <A : Input, B> combineWith(other: Schedule <F, A, B>, f: ( Boolean , Boolean ) -> Boolean , g: ( Duration , Duration ) -> Duration ): Schedule <F, A, Tuple2<Output, B>> |
compose | Infix variant of pipe with reversed order.infix fun <B> compose(other: Schedule <F, B, Input>): Schedule <F, B, Output> |
const | Change the result of a Schedule to always be bfun <B> const(b: B): Schedule <F, Input, B> |
contramap | Change the input of the schedule. May alter a schedules decision if it is based on input.abstract fun <B> contramap(f: (B) -> Input): Schedule <F, B, Output> |
delayed | Adjust the delay of a schedule’s Decisionfun delayed(f: ( Duration ) -> Duration ): Schedule <F, Input, Output> |
dimap | fun <B, C> dimap(f: (B) -> Input, g: (Output) -> C): Schedule <F, B, C> |
fold | Non-effectful version of foldM.fun <C> fold(initial: C, f: (C, Output) -> C): Schedule <F, Input, C> |
foldM | Accumulate the results of a schedule by folding over them effectfully.abstract fun <C> foldM(initial: Kind<F, C>, f: (C, Output) -> Kind<F, C>): Schedule <F, Input, C> |
forever | Always retry a schedule regardless of the decision made prior to invoking this method.abstract fun forever(): Schedule <F, Input, Output> |
jittered | Add random jitter to a schedule. The argument genRand is supposed to be a computation that returns doubles. An example would be the following IO IO { Random.nextDouble() } .fun jittered(genRand: Kind<F, Double >): Schedule <F, Input, Output> fun jittered(MF: MonadDefer <F>): Schedule <F, Input, Output> |
logInput | Run a effectful handler on every input. Does not alter the decision.abstract fun logInput(f: (Input) -> Kind<F, Unit >): Schedule <F, Input, Output> |
logOutput | Run a effectful handler on every output. Does not alter the decision.abstract fun logOutput(f: (Output) -> Kind<F, Unit >): Schedule <F, Input, Output> |
map | Change the output of a schedule. Does not alter the decision of the schedule.abstract fun <B> map(f: (Output) -> B): Schedule <F, Input, B> |
modifyDelay | Change the delay of a resulting Decision based on the Output and the produced delay.abstract fun modifyDelay(f: (Output, Duration ) -> Kind<F, Duration >): Schedule <F, Input, Output> |
not | Invert the decision of a schedule.abstract operator fun not(): Schedule <F, Input, Output> |
or | Combine two schedules. Continues if one continues and chooses the minimum delayinfix fun <A : Input, B> or(other: Schedule <F, A, B>): Schedule <F, A, Tuple2<Output, B>> |
pipe | Compose this schedule with the other schedule by piping the output of this schedule into the input of the other.abstract infix fun <B> pipe(other: Schedule <F, Output, B>): Schedule <F, Input, B> |
tupled | Combine two with different input and output using and. Continues when both continue and uses the maximum delay.abstract infix fun <A, B> tupled(other: Schedule <F, A, B>): Schedule <F, Tuple2<Input, A>, Tuple2<Output, B>> |
unit | fun unit(): Schedule <F, Input, Unit > |
untilInput | untilInput(f) = whileInput(f).not() fun <A : Input> untilInput(f: (A) -> Boolean ): Schedule <F, A, Output> |
untilOutput | untilOutput(f) = whileOutput(f).not() fun untilOutput(f: (Output) -> Boolean ): Schedule <F, Input, Output> |
whileInput | Continue or stop the schedule based on the inputfun <A : Input> whileInput(f: (A) -> Boolean ): Schedule <F, A, Output> |
whileOutput | Continue or stop the schedule based on the outputfun whileOutput(f: (Output) -> Boolean ): Schedule <F, Input, Output> |
zipLeft | Combine two schedules with and but throw away the right schedule’s resultinfix fun <A : Input, B> zipLeft(other: Schedule <F, A, B>): Schedule <F, A, Output> |
zipRight | Combine two schedules with and but throw away the left schedule’s resultinfix fun <A : Input, B> zipRight(other: Schedule <F, A, B>): Schedule <F, A, B> |
collect | Create a schedule which collects all it’s inputs in a listfun <F, A> collect(M: Monad<F>): Schedule <F, A, List <A>> |
decision | Create a schedule that returns its decisionsfun <F, A> decision(M: Monad<F>): Schedule <F, A, Boolean > |
delay | Create a schedule that returns its delay.fun <F, A> delay(M: Monad<F>): Schedule <F, A, Duration > |
delayed | Create a schedule that uses another schedule to generate the delay of this schedule. Continues for as long as delaySchedule continues and adds the output of delaySchedule to the delay that delaySchedule produced. Also returns the full delay as output.fun <F, A> delayed(M: Monad<F>, delaySchedule: Schedule <F, A, Duration >): Schedule <F, A, Duration > |
doUntil | Create a schedule that continues until đ returns true.fun <F, A> doUntil(M: Monad<F>, f: (A) -> Boolean ): Schedule <F, A, A> |
doWhile | Create a schedule that continues as long as đ returns true.fun <F, A> doWhile(M: Monad<F>, f: (A) -> Boolean ): Schedule <F, A, A> |
exponential | Create a schedule that increases its delay exponentially with a given factor and base. Delay can be calculated as base * factor ^ n where n is the number of executions.fun <F, A> exponential(M: Monad<F>, base: Duration , factor: Double = 2.0): Schedule <F, A, Duration > |
fibonacci | Create a schedule that continues with increasing delay by adding the last two delays.fun <F, A> fibonacci(M: Monad<F>, one: Duration ): Schedule <F, A, Duration > |
forever | Create a schedule that continues forever and returns the number of repetitions.fun <F, A> forever(M: Monad<F>): Schedule <F, A, Int > |
identity | Creates a schedule that continues without delay and just returns its input.fun <F, A> identity(M: Monad<F>): Schedule <F, A, A> |
invoke | Invoke constructor to manually define a schedule. If you need this, please consider adding it to arrow or suggest a change to avoid using this manual method.operator fun <F, S, A, B> invoke(M: Monad<F>, initial: Kind<F, S>, update: (a: A, s: S) -> Kind<F, Decision<S, B>>): Schedule <F, A, B> |
linear | Create a schedule which increases its delay linear by n * base where n is the number of executions.fun <F, A> linear(M: Monad<F>, base: Duration ): Schedule <F, A, Duration > |
logInput | Create a schedule with an effectful handler on the input.fun <F, A> logInput(MM: Monad<F>, f: (A) -> Kind<F, Unit >): Schedule <F, A, A> |
logOutput | Create a schedule with an effectful handler on the output.fun <F, A> logOutput(M: Monad<F>, f: (A) -> Kind<F, Unit >): Schedule <F, A, A> |
never | Create a schedule that never retries.fun <F, A> never(AS: Async <F>): Schedule <F, A, Nothing > |
once | Create a schedule that only ever retries once.fun <F, A> once(M: Monad<F>): Schedule <F, A, Unit > |
recurs | Create a schedule that continues n times and returns the number of repetitions.fun <F, A> recurs(M: Monad<F>, n: Int ): Schedule <F, A, Int > |
spaced | Create a schedule that continues with fixed delay.fun <F, A> spaced(M: Monad<F>, interval: Duration ): Schedule <F, A, Int > |
unfold | Non-effectful variant of unfoldMfun <F, I, A> unfold(M: Monad<F>, c: A, f: (A) -> A): Schedule <F, I, A> |
unfoldM | Create a schedule that unfolds effectfully using a seed value c and a unfold function f. This keeps the current state (the current seed) as State and runs the unfold function on every call to update. This schedule always continues without delay and returns the current state.fun <F, I, A> unfoldM(M: Monad<F>, c: Kind<F, A>, f: (A) -> Kind<F, A>): Schedule <F, I, A> |
unit | Creates a schedule that continues without delay and always returns Unitfun <F, A> unit(M: Monad<F>): Schedule <F, A, Unit > |
withMonad | Build a schedule with functions that have the Monad already partially applied. Prefer this to the general combinators as soon as you create more than one schedule and combine it somehow.fun <M, Input, Output> withMonad(MM: Monad<M>, f: ScheduleFor<M>.() -> Schedule <M, Input, Output>): Schedule <M, Input, Output> |
maybeCombine | fun <F, Input, Output> Schedule <F, Input, Output>.~~maybeCombine~~(OI: Semigroup<Output>, arg1: Schedule <F, Input, Output>): Schedule <F, Input, Output> |
plus | fun <F, Input, Output> Schedule <F, Input, Output>.~~plus~~(OI: Semigroup<Output>, arg1: Schedule <F, Input, Output>): Schedule <F, Input, Output> |
applicative | fun <F, Input> Schedule.Companion.~~applicative~~(MF: Monad<F>): ScheduleApplicative <F, Input> |
apply | fun <F, Input> Schedule.Companion.~~apply~~(): ScheduleAppy <F, Input> |
category | fun <F> Schedule.Companion.~~category~~(MM: Monad<F>): ScheduleCategory <F> |
contravariant | fun <F, Output> Schedule.Companion.~~contravariant~~(): ScheduleContravariant <F, Output> |
functor | fun <F, Input> Schedule.Companion.~~functor~~(): ScheduleFunctor <F, Input> |
monoid | fun <F, Input, Output> Schedule.Companion.~~monoid~~(OI: Monoid<Output>, MF: Monad<F>): ScheduleMonoid <F, Input, Output> |
profunctor | fun <F> Schedule.Companion.~~profunctor~~(): ScheduleProfunctor <F> |
semigroup | fun <F, Input, Output> Schedule.Companion.~~semigroup~~(OI: Semigroup<Output>): ScheduleSemigroup <F, Input, Output> |
semigroupK | fun <F, Input> Schedule.Companion.~~semigroupK~~(): ScheduleSemigroupK <F, Input> |
Do you like Arrow?
✖