parMap

@FlowPreview
@ExperimentalCoroutinesApi
inline fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(a: A) -> B): Flow<B>(source)

Like map, but will evaluate transform in parallel, emitting the results downstream in the same order as the input stream. The number of concurrent effects is limited by concurrency.

If concurrency is more than 1, then inner flows are be collected by this operator concurrently. With concurrency == 1 this operator is identical to map.

Applications of flowOn, buffer, and produceIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.

See parMapUnordered if there is no requirement to retain the order of the original stream.

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.collect
import arrow.fx.coroutines.parMap

//sampleStart
suspend fun main(): Unit {
flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.parMap { a ->
delay(100)
a
}.toList() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
}
//sampleEnd

The upstream source runs concurrently with downstream parMap, and thus the upstream concurrently runs, "prefetching", the next element. i.e.

 import arrow.fx.coroutines.*

suspend fun main(): Unit {
//sampleStart
val source = flowOf(1, 2, 3, 4)
source.parMap(concurrency= 2) {
println("Processing $it")
never<Unit>()
}.collect()
//sampleEnd
}

1, 2, 3 will be emitted from source but only "Processing 1" & "Processing 2" will get printed.