arrow-fx-coroutines / arrow.fx.coroutines / Semaphore
interface ~~Semaphore~~
Deprecated: use KotlinX Semaphore
A counting Semaphore has a non-negative number of permits available. It is used to track how many permits are in-use, and to automatically await a number of permits to become available.
Acquiring permits decreases the available permits, and releasing increases the available permits.
Acquiring permits when there aren’t enough available will suspend the acquire call until the requested becomes available. Note that acquires are satisfied in strict FIFO order. The suspending acquire calls are cancellable, and will release any already acquired permits.
Let’s say we want to guarantee mutually exclusiveness, we can use a Semaphore
with a single permit.
Having a Semaphore
with a single permit, we can track that only a single context can access something.
//sampleStart
import arrow.fx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
/* Only allows a single accesor */
class PreciousFile(private val accesors: AtomicInteger = AtomicInteger(0)) {
fun use(): Unit {
check(accesors.incrementAndGet() == 1) { "File accessed before released" }
check(accesors.decrementAndGet() == 0) { "File accessed before released" }
}
}
suspend fun main() {
val file = PreciousFile()
val mutex = Semaphore(1)
(0 until 100).parTraverse(IOPool) { i ->
mutex.withPermit {
val res = file.use()
println("$i accessed PreciousFile on ${Thread.currentThread().name}")
}
}
//sampleEnd
}
By wrapping our operation in withPermit
we ensure that our var count: Int
is only updated by a single thread at the same time.
If we wouldn’t protect our PreciousFile
from being access by only a single thread at the same time, then our program will crash.
This is a common use-case when you need to write to a single File
from different threads, since concurrent writes could result in inconsistent state.
Semaphore
is more powerful besides just modelling mutally exlusiveness,
since it’s allows to track any amount of permits.
You can also use it to limit amount of parallel tasks, for example when using parTraverse
we might want to limit how many tasks are running effectively in parallel.
import arrow.fx.coroutines.*
suspend fun heavyProcess(i: Int): Unit {
println("Started job $i")
sleep(250.milliseconds)
println("Finished job $i")
}
suspend fun main(): Unit {
val limit = 3
val semaphore = Semaphore(limit)
(0..50).parTraverse { i ->
semaphore.withPermit { heavyProcess(i) }
}
}
Here we set a limit of 3
to ensure that only 3 heavyProcess
are running at the same time.
This can ensure we don’t stress the JVM too hard, OOM or worse.
acquire | Acquires 1 permit, suspending until the requested permit is available.open suspend fun acquire(): Unit |
acquireN | Acquires n permits, suspends until the required permits are available. When it gets cancelled while suspending, it will release its already acquired permits.abstract suspend fun acquireN(n: Long ): Unit |
available | Gets a snapshot of the currently available permits, always non negative.abstract suspend fun available(): Long |
count | Gets a snapshot of the number of permits callers are waiting for, when there are no permits available.abstract suspend fun count(): Long |
release | Releases 1 permit, potentially unblocking an outstanding acquire for 1 permit.open suspend fun release(): Unit |
releaseN | Releases n permits, potentially unblocking outstanding acquires.abstract suspend fun releaseN(n: Long ): Unit |
tryAcquire | Acquires 1 permit and signals success with a Boolean immediately.open suspend fun tryAcquire(): Boolean |
tryAcquireN | Acquires n permits and signals success with a Boolean immediately.abstract suspend fun tryAcquireN(n: Long ): Boolean |
withPermit | Returns an effect that acquires a permit, runs the supplied effect, and then releases the permit.open suspend fun <A> withPermit(fa: suspend () -> A): A |
withPermitN | Runs the supplied effect with an acquired permit, and releases the permit on ExitCase.abstract suspend fun <A> withPermitN(n: Long , fa: suspend () -> A): A |
invoke | Constructs a Semaphore initialized with n available permits.suspend operator fun ~~invoke~~(n: Long ): Semaphore suspend operator fun ~~invoke~~(n: Int ): Semaphore |
unsafe | fun unsafe(n: Long ): Semaphore |
Do you like Arrow?
✖