arrow-fx-coroutines / arrow.fx.coroutines / ConcurrentVar

ConcurrentVar

interface ~~ConcurrentVar~~<A> Deprecated: ConcurrentVar is deprecated in favor of kotlinx.coroutines.channels.Channel(1).

ConcurrentVar is a mutable concurrent safe variable which is either empty or contains a single value of type A. It behaves as a single element arrow.fx.coroutines.stream.concurrent.Queue. When trying to put or take, it will suspend when it is respectively isEmpty or isNotEmpty.

There are also operators that return immediately, tryTake & tryPut, since checking isEmpty could be outdated immediately.

ConcurrentVar is appropriate for building synchronization primitives and performing simple inter-thread communications. i.e. in situations where you want to suspend until the ConcurrentVar is initialised with a value A.

import arrow.fx.coroutines.*

suspend fun main(): Unit {
  val mvar = ConcurrentVar.empty<Int>()

  ForkConnected {
    sleep(3.seconds)
    mvar.put(5)
  }

 val r = mvar.take() // suspend until Fork puts result in ConcurrentVar
 println(r)
}

Using ConcurrentVar as a lock safely

ConcurrentVar can also be used as a lock if every operation calls take, does work and then put’s the value back. However this is quite unsafe if operations can be cancelled or can throw exception while they hold a lock. The best approach to overcome this is to use bracketCase however since this is a rather common pattern, it is made available with withConcurrentVar, modify and modify_.

Note that this only works if all operations over the ConcurrentVar follow the pattern of first taking and then putting back both exactly once and in order. Or use the helpers to also be safe in case of exceptions and cancellation.

Functions

isEmpty Returns true if there are no elements. Otherwise false. This may be outdated immediately; use tryPut or tryTake to put & take without suspending.abstract suspend fun isEmpty(): Boolean
isNotEmpty Returns true if there no elements. Otherwise false. This may be outdated immediately; use tryPut or tryTake to put & take without suspending.abstract suspend fun isNotEmpty(): Boolean
put Puts A in the ConcurrentVar if it is empty, or suspends if full until the given value is next in line to be consumed by take.abstract suspend fun put(a: A): Unit
read Reads the current value without emptying the ConcurrentVar, assuming there is one, or otherwise it suspends until there is a value available.abstract suspend fun read(): A
take Empties the ConcurrentVar if full, returning the value, or suspend until a value is available.abstract suspend fun take(): A
tryPut Tries to put A in the ConcurrentVar if it is empty, returns immediately with true if successfully put the value in the ConcurrentVar or false otherwise.abstract suspend fun tryPut(a: A): Boolean
tryTake Tries to take the value of ConcurrentVar, returns a value immediately if the ConcurrentVar is not empty, or null otherwise.abstract suspend fun tryTake(): A?

Companion Object Functions

empty Returns an empty ConcurrentVar instance.suspend fun <A> empty(): ConcurrentVar<A>
invoke Builds a ConcurrentVar instance with an initial value.suspend operator fun <A> invoke(initial: A): ConcurrentVar<A>
unsafe fun <A> unsafe(initial: A): ConcurrentVar<A>
unsafeEmpty fun <A> unsafeEmpty(): ConcurrentVar<A>

Do you like Arrow?

Arrow Org
<