arrow-fx-coroutines / arrow.fx.coroutines.stream.concurrent / Queue / bounded

bounded

suspend fun <A> bounded(maxSize: Int): Queue<A>

Creates a FIFO queue with the specified size bound.

import arrow.fx.coroutines.*
import arrow.fx.coroutines.stream.*
import arrow.fx.coroutines.stream.concurrent.*

suspend fun main(): Unit {
  val q = Queue.bounded<Int>(10)
  Stream(
    Stream.range(0..100)
      .through(q.enqueue())
      .void(),
    q.dequeue()
  ).parJoinUnbounded()
    .take(100)
    .toList().let(::println) // [0, 1, 2, .., 99]

  val alwaysEmpty = Queue.bounded<Int>(0)
  ForkConnected { alwaysEmpty.dequeue1() }
  alwaysEmpty.tryOffer1(1).let(::println) // false
}

A size <= 0, will not allow any elements to pass through the Queue, in that case it will always return false for tryOffer1.

See Also

Queue.synchronous

Do you like Arrow?

Arrow Org
<