arrow-fx-coroutines / arrow.fx.coroutines.stream / Stream

Stream

class ~~Stream~~<out O> : StreamOf<O> Deprecated: Deprecated in favor of Flow

Functions

asPull Returns a Pull that writes all output to the Pull.fun asPull(): Pull<O, Unit>
attempt Returns a stream of O values wrapped in Either.Right until the first error, which is emitted wrapped in Either.Left.fun attempt(): Stream<Either<Throwable, O>>
attempts Retries on failure, returning a stream of attempts that can be manipulated with standard stream operations such as take, collectFirst and interruptWhen.fun attempts(delays: Stream<Duration>): Stream<Either<Throwable, O>>
buffer Behaves like the identity function, but requests n elements at a time from the input.fun buffer(n: Int): Stream<O>
bufferAll Behaves like the identity stream, but emits no output until the source is exhausted.fun bufferAll(): Stream<O>
bufferBy Behaves like the identity stream, but requests elements from its input in blocks that end whenever the predicate switches from true to false.fun bufferBy(f: (O) -> Boolean): Stream<O>
chunkLimit Outputs chunk with a limited maximum size, splitting as necessary.fun chunkLimit(n: Int): Stream<Chunk<O>>
chunkMin Outputs chunks of size larger than Nfun chunkMin(n: Int, allowFewerTotal: Boolean = true): Stream<Chunk<O>>
chunkN Outputs chunks of size n.fun chunkN(n: Int, allowFewer: Boolean = true): Stream<Chunk<O>>
chunks Outputs all chunks from the source stream.fun chunks(): Stream<Chunk<O>>
concurrently Runs the supplied stream in the background as elements from this stream are pulled.fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = Dispatchers.Default): Stream<O>
delayBy Returns a stream that when run, sleeps for duration d and then pulls from this stream.fun delayBy(d: Duration): Stream<O>
delete Skips the first element that matches the predicate.fun delete(p: (O) -> Boolean): Stream<O>
drop Drops n elements of the input, then echoes the rest. Alias for drop.fun drop(n: Int): Stream<O>
Drops n elements of the input, then echoes the rest.fun drop(n: Long): Stream<O>
dropLast fun dropLast(n: Int): Stream<O>
Outputs all but the last n elements of the input.fun dropLast(n: Long): Stream<O>
dropThrough Like dropWhile, but drops the first value which tests false.fun dropThrough(p: (O) -> Boolean): Stream<O>
dropWhile Drops elements from the head of this stream until the supplied predicate returns false.fun dropWhile(p: (O) -> Boolean): Stream<O>
effectMap Alias for flatMap { o -> Stream.effect { f(o) } }.fun <B> effectMap(f: suspend (O) -> B): Stream<B>
effectMapAccumulate Like mapAccumulate, but accepts a function returning an suspend fun.fun <S, O2> effectMapAccumulate(s: S, f: suspend (S, O) -> Pair<S, O2>): Stream<Pair<S, O2>>
effectScan Like scan, but accepts a suspending function.fun <O2> effectScan(z: O2, f: suspend (O2, O) -> O2): Stream<O2>
effectTap Alias for effectMap { o -> f(o); o }. Useful if you want to attach log functionfun effectTap(f: suspend (O) -> Unit): Stream<O>
either Merges both Streams into an Stream of A and B represented by Either<A, B>. This operation is equivalent to a normal merge but for different types.fun <B> either(ctx: CoroutineContext = Dispatchers.Default, other: Stream<B>): Stream<Either<O, B>>
exists Emits true as soon as a matching element is received, else false if no input matches.fun exists(f: (O) -> Boolean): Stream<Boolean>
filter Emits only inputs which match the supplied predicate.fun filter(p: (O) -> Boolean): Stream<O>
filterWithPrevious Like filter, but the predicate f depends on the previously emitted and current elements.fun filterWithPrevious(f: (O, O) -> Boolean): Stream<O>
find alias for firstfun find(f: (O) -> Boolean): Stream<O>
first Emits the first element of the stream for which the predicate is defined, or is empty.fun first(f: (O) -> Boolean): Stream<O>
flatMap Creates a stream whose elements are generated by applying f to each output of the source stream and concatenated all of the results.fun <B> flatMap(f: (O) -> Stream<B>): Stream<B>
fold Folds all inputs using an initial value z and supplied binary operator, and emits a single element stream.fun <B> fold(initial: B, f: (B, O) -> B): Stream<B>
foldMap Alias for map(f).foldMonoid.fun <O2> foldMap(MO2: Monoid<O2>, f: (O) -> O2): Stream<O2>
forall Emits false and halts as soon as a non-matching element is received; or emits a single true value if it reaches the stream end and every input before that matches the predicate; or hangs without emitting values if the input is infinite and all inputs match the predicate.fun forall(p: (O) -> Boolean): Stream<Boolean>
interruptAfter Interrupts this stream after the specified duration has passed.fun interruptAfter(duration: Duration): Stream<O>
interruptScope Creates a scope that may be interrupted by calling scope#interrupt.fun interruptScope(): Stream<O>
interruptWhen Interrupts the stream, when haltOnSignal finishes its evaluation.fun interruptWhen(haltOnSignal: suspend () -> Either<Throwable, Unit>): Stream<O>
Alias for interruptWhen(haltWhenTrue.discrete).fun interruptWhen(haltWhenTrue: Signal<Boolean>): Stream<O>
Let through the s2 branch as long as the s1 branch is false, listening asynchronously for the left branch to become true. This halts as soon as either branch halts.fun interruptWhen(haltWhenTrue: Stream<Boolean>): Stream<O>
map Applies the specified pure function to each input and emits the result.fun <B> map(f: (O) -> B): Stream<B>
mapAccumulate Maps a running total according to S and the input with the function f.fun <S, O2> mapAccumulate(init: S, f: (S, O) -> Pair<S, O2>): Stream<Pair<S, O2>>
mapChunks Applies the specified pure function to each chunk in this stream.fun <B> mapChunks(f: (Chunk<O>) -> Chunk<B>): Stream<B>
mapFilter fun <O2> mapFilter(p: (O) -> O2?): Stream<O2>
mapNotNull fun <B> mapNotNull(p: (O) -> B?): Stream<B>
mask Behaves like the identity function but halts the stream on an error and does not return the error.fun mask(): Stream<O>
onFinalize Run the supplied effectful action at the end of this stream, regardless of how the stream terminates.fun onFinalize(f: suspend () -> Unit): Stream<O>
onFinalizeCase Like onFinalize but provides the reason for finalization as an ExitCase.fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream<O>
repeat Repeat this stream an infinite number of times. s.repeat() == s.append { s.append { s.append { ... } }fun repeat(): Stream<O>
repeatN Repeat this stream a given number of times.fun repeatN(n: Long): Stream<O>
scope Introduces an explicit scope.fun scope(): Stream<O>
spawn Starts this stream and cancels it as finalization of the returned stream.fun spawn(ctx: CoroutineContext = Dispatchers.Default): Stream<Fiber<Unit>>
tail Emits all elements of the input except the first one.fun tail(): Stream<O>
take Emits the first n elements of this stream. Alias for take.fun take(n: Int): Stream<O>
Emits the first n elements of this stream.fun take(n: Long): Stream<O>
takeLast Emits the last n elements of the input.fun takeLast(n: Int): Stream<O>
takeLastOrNull Emits the last n elements of the input.fun takeLastOrNull(n: Int): Stream<O?>
takeThrough Like takeWhile, but emits the first value which tests false.fun takeThrough(p: (O) -> Boolean): Stream<O>
takeWhile Emits the longest prefix of the input for which all elements test true according to f.fun takeWhile(p: (O) -> Boolean): Stream<O>
through Transforms this stream using the given Pipe.fun <B> through(pipe: Pipe<O, B>): Stream<B>
Transforms this stream and s2 using the given Pipe2.fun <O2, B> through(s2: Stream<O2>, pipe2: Pipe2<O, O2, B>): Stream<B>
timeout Fails this stream with a TimeoutException if it does not complete within given timeout.fun timeout(timeout: Duration): Stream<O>
toString fun toString(): String
unchunk Converts the input to a stream of 1-element chunks.fun unchunk(): Stream<O>
void Removes all output values from this stream.fun void(): Stream<Nothing>
zip Deterministically zips elements, terminating when the end of either branch is reached naturally.fun <B> zip(that: Stream<B>): Stream<Pair<O, B>>
zipLeft Like zip, but selects the left values only. Useful with timed streams, the example below will emit a number every 100 milliseconds.fun <B> zipLeft(that: Stream<B>): Stream<O>
zipRight Like zip, but selects the right values only. Useful with timed streams, the example below will emit a number every 100 milliseconds.fun <B> zipRight(that: Stream<B>): Stream<B>
zipWith Deterministically zips elements using the specified function, terminating when the end of either branch is reached naturally.fun <B, C> zipWith(other: Stream<B>, f: (O, B) -> C): Stream<C>
zipWithIndex Zips the elements of the input stream with its indices, and returns the new stream.fun zipWithIndex(): Stream<Pair<O, Long>>
zipWithNext Zips each element of this stream with the next element wrapped into Some. The last element is zipped with None.fun zipWithNext(): Stream<Pair<O, O?>>
zipWithPrevious Zips each element of this stream with the previous element wrapped into Some. The first element is zipped with None.fun zipWithPrevious(): Stream<Pair<O?, O>>
zipWithPreviousAndNext Zips each element of this stream with its previous and next element wrapped into Some. The first element is zipped with None as the previous element, the last element is zipped with None as the next element.fun zipWithPreviousAndNext(): Stream<Triple<O?, O, O?>>
zipWithScan Zips the input with a running total according to S, up to but not including the current element. Thus the initial z value is the first emitted to the output:fun <B> zipWithScan(z: B, f: (B, O) -> B): Stream<Pair<O, B>>
zipWithScan1 Zips the input with a running total according to S, including the current element. Thus the initial z value is the first emitted to the output:fun <B> zipWithScan1(z: B, f: (B, O) -> B): Stream<Pair<O, B>>

Companion Object Properties

getScope Gets the current scope, allowing manual leasing or interruption. This is a low-level method and generally should not be used by user code.val getScope: Stream<Scope>
unit val unit: Stream<Unit>

Companion Object Functions

bracket Creates a stream that emits a resource allocated by an effect, ensuring the resource is eventually released regardless of how the stream is used.fun <R> bracket(acquire: suspend () -> R, release: suspend (R) -> Unit): Stream<R>
bracketCase Like bracket but the release action is passed an ExitCase.fun <R> bracketCase(acquire: suspend () -> R, release: suspend (R, ExitCase) -> Unit): Stream<R>
chunk Lifts a Chunk into a Streamfun <A> chunk(ch: Chunk<A>): Stream<A>
constant Creates an infinite pure stream that always returns the supplied value.fun <O> constant(o: O, chunkSize: Int = 256): Stream<O>
defer Returns a stream that evaluates s each time the stream is used, allowing use of a mutable value in stream computations.fun <O> defer(s: () -> Stream<O>): Stream<O>
effect Creates a single element stream that gets its value by evaluating the supplied effect. If the effect fails, the returned stream fails.fun <O> effect(fo: suspend () -> O): Stream<O>
effect_ Creates a stream that evaluates the supplied fa for its effect, discarding the output value. As a result, the returned stream emits no elements and hence has output type INothing.fun effect_(fa: suspend () -> Unit): Stream<Nothing>
effectUnChunk like effect but resulting chunk is flatten efficientlyfun <O> effectUnChunk(fo: suspend () -> Chunk<O>): Stream<O>
emits Creates a pure stream that emits the supplied values.fun <A> emits(vararg aas: A): Stream<A>
empty Empty Stream.fun <O> empty(): Stream<O>
force Lifts an effect that generates a stream in to a stream. Alias for effect(f).flatMap(_).fun <A> force(f: suspend () -> Stream<A>): Stream<A>
invoke Creates a pure stream that emits the supplied values. Alias for emits.operator fun <A> invoke(vararg aas: A): Stream<A>
iterable Like emits, but works for any class that extends Iterablefun <O> iterable(os: Iterable<O>): Stream<O>
iterate An infinite Stream that repeatedly applies a given function to a start value. initial is the first value emitted, followed by f(initial), then f(f(initial)), and so on.fun <A> iterate(initial: A, f: (A) -> A): Stream<A>
iterateEffect Like iterate, but takes an suspend function.fun <A> iterateEffect(start: A, f: suspend (A) -> A): Stream<A>
just Creates a singleton stream that emits the supplied value.fun <O> just(o: O): Stream<O>
never Creates a Stream that never returns any valuesfun <A> never(): Stream<A>
raiseError Creates a Stream that, when run, fails with the supplied exception err.fun <O> raiseError(err: Throwable): Stream<O>
random Creates a random stream of integers using a random seed.fun random(seed: Int): Stream<Int>
range Lazily produce the range [start, stopExclusive). If you want to produce the sequence in one chunk, instead of lazily, use emits(start until stopExclusive).fun range(range: IntProgression): Stream<Int>
fun range(range: LongProgression): Stream<Long>
fun range(range: CharProgression): Stream<Char>
resource Converts the supplied resource in to a singleton stream. Gives the same guarantees as using bracketCase.fun <O> resource(r: Resource<O>): Stream<O>
sleep A single-element Stream that waits for the duration d before emitting unit.fun sleep(d: Duration): Stream<Unit>
sleep_ Alias for sleep(d).void. Often used in conjunction with append (i.e., sleep_(..).append { s }) as a more performant version of sleep(..).flatMap { s }.fun sleep_(d: Duration): Stream<Nothing>
supervise Starts the supplied task and cancels it as finalization of the returned stream.fun <A> supervise(ctx: CoroutineContext = Dispatchers.Default, fa: suspend () -> A): Stream<Fiber<A>>
unfold Creates a stream by successively applying f until a null is returned, emitting each output O and using each output S as input to the next invocation of f.fun <S, O> unfold(s: S, f: (S) -> Pair<O, S>?): Stream<O>
unfoldChunk Like unfold but each invocation of f provides a chunk of output.fun <S, O> unfoldChunk(s: S, f: (S) -> Pair<Chunk<O>, S>?): Stream<O>
unfoldChunkEffect Like unfoldChunk, but takes an effectful function.fun <S, O> unfoldChunkEffect(s: S, f: suspend (S) -> Pair<Chunk<O>, S>?): Stream<O>
unfoldEffect Like unfold, but takes an suspend function.fun <S, O> unfoldEffect(s: S, f: suspend (S) -> Pair<O, S>?): Stream<O>

Extension Functions

append Lazily appends s2 to the end of this stream.fun <O> Stream<O>.append(s2: () -> Stream<O>): Stream<O>
asResource Opens DSL to consume Stream as a Resource.fun <O> Stream<O>.asResource(): ResourceOps<O>
cons Prepends a Chunk onto the front of this stream.fun <O> Stream<O>.cons(c: Chunk<O>): Stream<O>
cons1 Prepends a value onto the front of this stream.fun <O> Stream<O>.cons1(o: O): Stream<O>
drain Runs all the effects of this Stream and ignores all emitted values.suspend fun <O> Stream<O>.drain(): Unit
filterNotNull Filters any null.fun <O : Any> Stream<O?>.filterNotNull(): Stream<O>
filterOption Filters any arrow.core.None.fun <O> Stream<Option<O>>.filterOption(): Stream<O>
firstOrError Runs the first effect of this Stream, raising a NoSuchElementException if the stream emitted no values and returns the value if emitted.suspend fun <O> Stream<O>.firstOrError(): O
firstOrNull Runs the first effect of this Stream, and returns null if the stream emitted a value and returns the value if emitted.suspend fun <O> Stream<O>.firstOrNull(): O?
flatten Flattens a stream of streams in to a single stream by concatenating each stream.fun <O> Stream<Stream<O>>.flatten(): Stream<O>
fold1 Folds all inputs using the supplied operator f, and emits a single-element stream, or the empty stream if the input is empty, or the never stream if the input is non-terminating.fun <O> Stream<O>.fold1(f: (O, O) -> O): Stream<O>
foldChunks Folds all the effects of this stream in to a value by folding the output chunks together, starting with the provided init and combining the current value with each output chunk using fsuspend fun <O, B> Stream<O>.foldChunks(init: B, f: (B, Chunk<O>) -> B): B
foldMonoid Folds this stream with the monoid for O.fun <O> Stream<O>.foldMonoid(MO: Monoid<O>): Stream<O>
handleErrorWith If this Stream terminates with Stream.raiseError, invoke [h](../handle-error-with.html#arrow.fx.coroutines.stream$handleErrorWith(arrow.fx.coroutines.stream.Stream((arrow.fx.coroutines.stream.handleErrorWith.O)), kotlin.Function1((kotlin.Throwable, arrow.fx.coroutines.stream.Stream((arrow.fx.coroutines.stream.handleErrorWith.O)))))/h) and continue with result.fun `[`Stream`](./index.html)`.handleErrorWith(h: (`[`Throwable`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin/-throwable/index.html)`) -> `[`Stream`](./index.html)`): `[`Stream`](./index.html)``
interleave Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.fun <O> Stream<O>.interleave(that: Stream<O>): Stream<O>
interleaveAll Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.fun <O> Stream<O>.interleaveAll(that: Stream<O>): Stream<O>
intersperse Emits the specified separator between every pair of elements in the source stream.fun <O> Stream<O>.intersperse(separator: O): Stream<O>
lastOrError Runs all the effects of this Stream, raising a NoSuchElementException if the stream emitted no values, and returning the last value emitted otherwise.suspend fun <O> Stream<O>.lastOrError(): O
lastOrNull Runs all the effects of this Stream, and returns null if the stream emitted no values and returning the last value emitted if values were emitted.suspend fun <O> Stream<O>.lastOrNull(): O?
noneTerminate Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion. Note that this doesn’t actually limit an infinite stream.fun <O> Stream<O>.noneTerminate(): Stream<Option<O>>
onComplete Run s2 after this, regardless of errors during this, then reraise any errors encountered during this.fun <O> Stream<O>.onComplete(s2: () -> Stream<O>): Stream<O>
parJoin Non-deterministically merges a stream of streams (outer) in to a single stream, opening at most maxOpen streams at any point in time.fun <O> Stream<Stream<O>>.~~parJoin~~(maxOpen: Int, ctx: CoroutineContext = Dispatchers.Default): Stream<O>
parJoinUnbounded Like parJoin but races all inner streams simultaneously without limit.fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = Dispatchers.Default): Stream<O>
reduce fun <O> Stream<O>.reduce(f: (O, O) -> O): Stream<O>
reduceSemigroup Reduces this stream with the Semigroup for O.fun <O> Stream<O>.reduceSemigroup(S: Semigroup<O>): Stream<O>
repartition Repartitions the input with the function f. On each step f is applied to the input and all elements but the last of the resulting sequence are emitted. The last element is then appended to the next input using the Semigroup S.fun <O> Stream<O>.repartition(S: Semigroup<O>, f: (O) -> Chunk<O>): Stream<O>
scan Left fold which outputs all intermediate results.fun <O, O2> Stream<O>.scan(init: O2, f: (O2, O) -> O2): Stream<O2>
scan1 Like [scan], but uses the first element of the stream as the seed.fun <O> Stream<O>.scan1(f: (O, O) -> O): Stream<O>
scanChunks Like scan but f is applied to each chunk of the source stream. The resulting chunk is emitted and the result of the chunk is used in the next invocation of f.fun <O, S, O2> Stream<O>.scanChunks(init: S, f: (S, Chunk<O>) -> Pair<S, Chunk<O2>>): Stream<O2>
scanChunksOpt More general version of scanChunks where the current state (i.e., S) can be inspected to determine if another chunk should be pulled or if the stream should terminate. Termination is signaled by returning null from f. Otherwise, a function which consumes the next chunk.fun <O, S, O2> Stream<O>.scanChunksOpt(init: S, f: (S) -> ((Chunk<O>) -> Pair<S, Chunk<O2>>)?): Stream<O2>
scanMap Alias for map(f).scanMonoid.fun <O, O2> Stream<O>.scanMap(MO2: Monoid<O2>, f: (O) -> O2): Stream<O2>
scanMonoid Folds this stream with the monoid for O while emitting all intermediate results.fun <O> Stream<O>.scanMonoid(MO: Monoid<O>): Stream<O>
terminateOn Halts the input stream when the condition is true.fun <O> Stream<O>.terminateOn(terminator: (O) -> Boolean): Stream<O>
terminateOnNone Halts the input stream at the first arrow.core.None.fun <O> Stream<Option<O>>.terminateOnNone(): Stream<O>
terminateOnNull Halts the input stream at the first null.fun <O> Stream<O>.terminateOnNull(): Stream<O>
toList Runs all the effects of this Stream and collects all emitted values into a List. If the Stream doesn’t emit any values it returns emptyList.suspend fun <O> Stream<O>.toList(): List<O>
toSet Runs all the effects of this Stream and collects all emitted values into a Set. If the Stream doesn’t emit any values it returns emptySet.suspend fun <O> Stream<O>.toSet(): Set<O>
zipAll Deterministically zips elements, terminating when the ends of both branches are reached naturally, padding the left branch with pad1 and padding the right branch with pad2 as necessary.fun <O, B> Stream<O>.zipAll(that: Stream<B>, pad1: O, pad2: B): Stream<Pair<O, B>>
zipAllWith Deterministically zips elements with the specified function, terminating when the ends of both branches are reached naturally, padding the left branch with pad1 and padding the right branch with pad2 as necessary.fun <A, B, C> Stream<A>.zipAllWith(that: Stream<B>, pad1: A, pad2: B, f: (A, B) -> C): Stream<C>

Companion Object Extension Functions

callback Creates a Stream from the given suspended block callback, allowing to emit, set cancel effects and end the emission.fun <A> Stream.Companion.callback(f: suspend EmitterSyntax<A>.() -> Unit): Stream<A>
cancellable Creates a cancellable Stream from the given suspended block that will evaluate the passed CancelToken if cancelled.fun <A> Stream.Companion.cancellable(f: suspend EmitterSyntax<A>.() -> CancelToken): Stream<A>
monoid Monoid instance for Stream.fun <O> Stream.Companion.monoid(): Monoid<Stream<O>>

Do you like Arrow?

Arrow Org
<