arrow-fx-coroutines / arrow.fx.coroutines / Semaphore

Semaphore

interface ~~Semaphore~~ Deprecated: use KotlinX Semaphore

A counting Semaphore has a non-negative number of permits available. It is used to track how many permits are in-use, and to automatically await a number of permits to become available.

Acquiring permits decreases the available permits, and releasing increases the available permits.

Acquiring permits when there aren’t enough available will suspend the acquire call until the requested becomes available. Note that acquires are satisfied in strict FIFO order. The suspending acquire calls are cancellable, and will release any already acquired permits.

Let’s say we want to guarantee mutually exclusiveness, we can use a Semaphore with a single permit. Having a Semaphore with a single permit, we can track that only a single context can access something.

//sampleStart
import arrow.fx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger

/* Only allows a single accesor */
class PreciousFile(private val accesors: AtomicInteger = AtomicInteger(0)) {
    fun use(): Unit {
       check(accesors.incrementAndGet() == 1) { "File accessed before released" }
       check(accesors.decrementAndGet() == 0) { "File accessed before released" }
    }
}

suspend fun main() {
  val file = PreciousFile()
  val mutex = Semaphore(1)

  (0 until 100).parTraverse(IOPool) { i ->
    mutex.withPermit {
      val res = file.use()
      println("$i accessed PreciousFile on ${Thread.currentThread().name}")
    }
  }
//sampleEnd
}

By wrapping our operation in withPermit we ensure that our var count: Int is only updated by a single thread at the same time. If we wouldn’t protect our PreciousFile from being access by only a single thread at the same time, then our program will crash.

This is a common use-case when you need to write to a single File from different threads, since concurrent writes could result in inconsistent state.

Semaphore is more powerful besides just modelling mutally exlusiveness, since it’s allows to track any amount of permits. You can also use it to limit amount of parallel tasks, for example when using parTraverse we might want to limit how many tasks are running effectively in parallel.

import arrow.fx.coroutines.*

suspend fun heavyProcess(i: Int): Unit {
  println("Started job $i")
  sleep(250.milliseconds)
  println("Finished job $i")
}

suspend fun main(): Unit {
 val limit = 3
 val semaphore = Semaphore(limit)
 (0..50).parTraverse { i ->
   semaphore.withPermit { heavyProcess(i) }
 }
}

Here we set a limit of 3 to ensure that only 3 heavyProcess are running at the same time. This can ensure we don’t stress the JVM too hard, OOM or worse.

Functions

acquire Acquires 1 permit, suspending until the requested permit is available.open suspend fun acquire(): Unit
acquireN Acquires n permits, suspends until the required permits are available. When it gets cancelled while suspending, it will release its already acquired permits.abstract suspend fun acquireN(n: Long): Unit
available Gets a snapshot of the currently available permits, always non negative.abstract suspend fun available(): Long
count Gets a snapshot of the number of permits callers are waiting for, when there are no permits available.abstract suspend fun count(): Long
release Releases 1 permit, potentially unblocking an outstanding acquire for 1 permit.open suspend fun release(): Unit
releaseN Releases n permits, potentially unblocking outstanding acquires.abstract suspend fun releaseN(n: Long): Unit
tryAcquire Acquires 1 permit and signals success with a Boolean immediately.open suspend fun tryAcquire(): Boolean
tryAcquireN Acquires n permits and signals success with a Boolean immediately.abstract suspend fun tryAcquireN(n: Long): Boolean
withPermit Returns an effect that acquires a permit, runs the supplied effect, and then releases the permit.open suspend fun <A> withPermit(fa: suspend () -> A): A
withPermitN Runs the supplied effect with an acquired permit, and releases the permit on ExitCase.abstract suspend fun <A> withPermitN(n: Long, fa: suspend () -> A): A

Companion Object Functions

invoke Constructs a Semaphore initialized with n available permits.suspend operator fun ~~invoke~~(n: Long): Semaphoresuspend operator fun ~~invoke~~(n: Int): Semaphore
unsafe fun unsafe(n: Long): Semaphore

Do you like Arrow?

Arrow Org
<