arrow-fx-coroutines / arrow.fx.coroutines / ConcurrentVar
interface ~~ConcurrentVar~~<A>
Deprecated: ConcurrentVar is deprecated in favor of kotlinx.coroutines.channels.Channel(1).
ConcurrentVar is a mutable concurrent safe variable which is either empty
or contains a single value
of type A.
It behaves as a single element arrow.fx.coroutines.stream.concurrent.Queue.
When trying to put or take, it will suspend when it is respectively isEmpty or isNotEmpty.
There are also operators that return immediately, tryTake & tryPut, since checking isEmpty could be outdated immediately.
ConcurrentVar is appropriate for building synchronization primitives and performing simple inter-thread communications.
i.e. in situations where you want to suspend
until the ConcurrentVar is initialised with a value A.
import arrow.fx.coroutines.*
suspend fun main(): Unit {
val mvar = ConcurrentVar.empty<Int>()
ForkConnected {
sleep(3.seconds)
mvar.put(5)
}
val r = mvar.take() // suspend until Fork puts result in ConcurrentVar
println(r)
}
ConcurrentVar can also be used as a lock if every operation calls take, does work and then put’s the value back. However this is quite unsafe if operations can be cancelled or can throw exception while they hold a lock. The best approach to overcome this is to use bracketCase however since this is a rather common pattern, it is made available with withConcurrentVar, modify and modify_.
Note that this only works if all operations over the ConcurrentVar follow the pattern of first taking and then putting back both exactly once and in order. Or use the helpers to also be safe in case of exceptions and cancellation.
isEmpty | Returns true if there are no elements. Otherwise false. This may be outdated immediately; use tryPut or tryTake to put & take without suspending.abstract suspend fun isEmpty(): Boolean |
isNotEmpty | Returns true if there no elements. Otherwise false. This may be outdated immediately; use tryPut or tryTake to put & take without suspending.abstract suspend fun isNotEmpty(): Boolean |
put | Puts A in the ConcurrentVar if it is empty, or suspends if full until the given value is next in line to be consumed by take.abstract suspend fun put(a: A): Unit |
read | Reads the current value without emptying the ConcurrentVar, assuming there is one, or otherwise it suspends until there is a value available.abstract suspend fun read(): A |
take | Empties the ConcurrentVar if full, returning the value, or suspend until a value is available.abstract suspend fun take(): A |
tryPut | Tries to put A in the ConcurrentVar if it is empty, returns immediately with true if successfully put the value in the ConcurrentVar or false otherwise.abstract suspend fun tryPut(a: A): Boolean |
tryTake | Tries to take the value of ConcurrentVar, returns a value immediately if the ConcurrentVar is not empty, or null otherwise.abstract suspend fun tryTake(): A? |
empty | Returns an empty ConcurrentVar instance.suspend fun <A> empty(): ConcurrentVar <A> |
invoke | Builds a ConcurrentVar instance with an initial value.suspend operator fun <A> invoke(initial: A): ConcurrentVar <A> |
unsafe | fun <A> unsafe(initial: A): ConcurrentVar <A> |
unsafeEmpty | fun <A> unsafeEmpty(): ConcurrentVar <A> |
Do you like Arrow?
✖