arrow-fx-coroutines / arrow.fx.coroutines.stream / cancellable
fun <A> Stream.Companion.cancellable(f: suspend
EmitterSyntax
<A>.() ->
CancelToken
):
Stream
<A>
Creates a cancellable Stream from the given suspended block that will evaluate the passed CancelToken if cancelled.
The suspending f runs in an uncancellable manner, acquiring CancelToken as a resource. If cancellation signal is received while cb is running, then the CancelToken will be triggered as soon as it’s returned.
import arrow.fx.coroutines.*
import arrow.fx.coroutines.stream.*
import java.lang.RuntimeException
import java.util.concurrent.Executors
import java.util.concurrent.Future
typealias Callback = (List<String>?, Throwable?) -> Unit
class GithubId
object GithubService {
private val listeners: MutableMap<GithubId, Future<*>> = mutableMapOf()
fun getUsernames(callback: Callback): GithubId {
val id = GithubId()
val future = Executors.newSingleThreadExecutor().run {
submit {
Thread.sleep(300)
callback(listOf("Arrow - 1"), null)
Thread.sleep(300)
callback(listOf("Arrow - 2"), null)
Thread.sleep(300)
callback(listOf("Arrow - 3"), null)
shutdown()
}
}
listeners[id] = future
return id
}
fun unregisterCallback(id: GithubId): Unit {
listeners[id]?.cancel(false)
listeners.remove(id)
}
}
suspend fun main(): Unit {
//sampleStart
fun getUsernames(): Stream<String> =
Stream.cancellable<String> {
val id = GithubService.getUsernames { names, throwable ->
when {
names != null -> emit(names)
throwable != null -> throw throwable
else -> throw RuntimeException("Null result and no exception")
}
}
CancelToken { GithubService.unregisterCallback(id) }
}.take(3)
val result = getUsernames()
.effectTap { println(it) }
.toList()
//sampleEnd
println(result)
}
If neither end()
nor other limit operators such as take(N)
are called,
then the Stream will never end.
Do you like Arrow?
✖