arrow-fx-coroutines / arrow.fx.coroutines.stream / scanChunksOpt

scanChunksOpt

fun <O, S, O2> Pull<O, Unit>.scanChunksOpt(init: S, f: (S) -> ((Chunk<O>) -> Pair<S, Chunk<O2>>)?): Pull<O2, S>

More general version of scanChunks where the current state (i.e., S) can be inspected to determine if another chunk should be pulled or if the pull should terminate. Termination is signaled by returning None from f. Otherwise, a function which consumes the next chunk is returned wrapped in Some. The final state value is returned as the result of the pull.

fun <O, S, O2> Stream<O>.scanChunksOpt(init: S, f: (S) -> ((Chunk<O>) -> Pair<S, Chunk<O2>>)?): Stream<O2>

More general version of scanChunks where the current state (i.e., S) can be inspected to determine if another chunk should be pulled or if the stream should terminate. Termination is signaled by returning null from f. Otherwise, a function which consumes the next chunk.

import arrow.fx.coroutines.stream.*

//sampleStart
fun <O> Stream<O>.take(n: Int): Stream<O> =
  scanChunksOpt<O, Int, O>(n) { n ->
    if (n <= 0) null
    else { chunk: Chunk<O> ->
      if (chunk.size() < n) Pair(n - chunk.size(), chunk)
      else Pair(0, chunk.take(n))
    }
  }

suspend fun main(): Unit =
  Stream.range(0..100).take(5)
    .toList().let(::println) // [0, 1, 2, 3, 4]
//sampleEnd

Do you like Arrow?

Arrow Org
<