arrow-fx / arrow.fx / Queue / sliding
fun <F, A> sliding(capacity: Int, CF: Concurrent<F>): Kind<F, Queue<F, A>>
Create a Queue with BackpressureStrategy.Sliding.
Offering to a sliding queue at capacity will cause the value at the front of the queue to be discarded to make room for the offered value.
import arrow.fx.*
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
suspend fun main(args: Array<String>): Unit = IO.fx {
 val capacity = 2
 val q = !Queue.sliding<ForIO, Int>(capacity, IO.concurrent())
 !q.offer(42)
 !q.offer(43)
 !q.offer(44) // <-- This `offer` exceeds the capacity, causing the oldest value to be removed
 val fortyThree = !q.take()
 val fortyFour  = !q.take()
 !q.offer(45)
 val fortyFive  = !q.take()
 !effect { println(listOf(fortyThree, fortyFour, fortyFive)) }
}.suspended()
Do you like Arrow?
✖