arrow-fx-coroutines / arrow.fx.coroutines.stream / Stream / concurrently
fun <O2> concurrently(other:
Stream
<O2>, ctx:
CoroutineContext
= Dispatchers.Default):
Stream
<O>
Runs the supplied stream in the background as elements from this stream are pulled.
The resulting stream terminates upon termination of this stream. The background stream will be interrupted at that point. Early termination of other does not terminate the resulting stream.
Any errors that occur in either this
or other stream result in the overall stream terminating
with an error.
Upon finalization, the resulting stream will interrupt the background stream and wait for it to be finalized.
import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.stream.*
//sampleStart
suspend fun main(): Unit {
val data = Stream.range(1..10)
val signalling = SignallingAtomic(0)
signalling.discrete()
.concurrently(data.effectMap { signalling.set(it) })
.takeWhile { it < 9 }
.toList()
.let(::println) //[0, 1, 2, 3, 4, 5, 6, 7, 8]
}
//sampleEnd
Do you like Arrow?
✖