arrow-fx-coroutines / arrow.fx.coroutines.stream / cancellable

cancellable

fun <A> Stream.Companion.cancellable(f: suspend EmitterSyntax<A>.() -> CancelToken): Stream<A>

Creates a cancellable Stream from the given suspended block that will evaluate the passed CancelToken if cancelled.

The suspending f runs in an uncancellable manner, acquiring CancelToken as a resource. If cancellation signal is received while cb is running, then the CancelToken will be triggered as soon as it’s returned.

import arrow.fx.coroutines.*
import arrow.fx.coroutines.stream.*
import java.lang.RuntimeException
import java.util.concurrent.Executors
import java.util.concurrent.Future

typealias Callback = (List<String>?, Throwable?) -> Unit

class GithubId
object GithubService {
  private val listeners: MutableMap<GithubId, Future<*>> = mutableMapOf()
  fun getUsernames(callback: Callback): GithubId {
    val id = GithubId()
    val future = Executors.newSingleThreadExecutor().run {
      submit {
        Thread.sleep(300)
        callback(listOf("Arrow - 1"), null)
        Thread.sleep(300)
        callback(listOf("Arrow - 2"), null)
        Thread.sleep(300)
        callback(listOf("Arrow - 3"), null)
        shutdown()
      }
    }
    listeners[id] = future
    return id
  }

  fun unregisterCallback(id: GithubId): Unit {
    listeners[id]?.cancel(false)
    listeners.remove(id)
  }
}

suspend fun main(): Unit {
  //sampleStart
  fun getUsernames(): Stream<String> =
    Stream.cancellable<String> {
      val id = GithubService.getUsernames { names, throwable ->
        when {
          names != null -> emit(names)
          throwable != null -> throw throwable
          else -> throw RuntimeException("Null result and no exception")
        }
      }
      CancelToken { GithubService.unregisterCallback(id) }
    }.take(3)

  val result = getUsernames()
    .effectTap { println(it) }
    .toList()
  //sampleEnd
  println(result)
}

If neither end() nor other limit operators such as take(N) are called, then the Stream will never end.

Do you like Arrow?

Arrow Org
<