arrow-fx-coroutines / arrow.fx.coroutines.stream / Stream
class ~~Stream~~<out O> :
StreamOf
<O>
Deprecated: Deprecated in favor of Flow
asPull | Returns a Pull that writes all output to the Pull.fun asPull(): Pull <O, Unit > |
attempt | Returns a stream of O values wrapped in Either.Right until the first error, which is emitted wrapped in Either.Left.fun attempt(): Stream <Either< Throwable , O>> |
attempts | Retries on failure, returning a stream of attempts that can be manipulated with standard stream operations such as take , collectFirst and interruptWhen .fun attempts(delays: Stream < Duration >): Stream <Either< Throwable , O>> |
buffer | Behaves like the identity function, but requests n elements at a time from the input.fun buffer(n: Int ): Stream <O> |
bufferAll | Behaves like the identity stream, but emits no output until the source is exhausted.fun bufferAll(): Stream <O> |
bufferBy | Behaves like the identity stream, but requests elements from its input in blocks that end whenever the predicate switches from true to false.fun bufferBy(f: (O) -> Boolean ): Stream <O> |
chunkLimit | Outputs chunk with a limited maximum size, splitting as necessary.fun chunkLimit(n: Int ): Stream < Chunk <O>> |
chunkMin | Outputs chunks of size larger than Nfun chunkMin(n: Int , allowFewerTotal: Boolean = true): Stream < Chunk <O>> |
chunkN | Outputs chunks of size n .fun chunkN(n: Int , allowFewer: Boolean = true): Stream < Chunk <O>> |
chunks | Outputs all chunks from the source stream.fun chunks(): Stream < Chunk <O>> |
concurrently | Runs the supplied stream in the background as elements from this stream are pulled.fun <O2> concurrently(other: Stream <O2>, ctx: CoroutineContext = Dispatchers.Default): Stream <O> |
delayBy | Returns a stream that when run, sleeps for duration d and then pulls from this stream.fun delayBy(d: Duration ): Stream <O> |
delete | Skips the first element that matches the predicate.fun delete(p: (O) -> Boolean ): Stream <O> |
drop | Drops n elements of the input, then echoes the rest. Alias for drop.fun drop(n: Int ): Stream <O> Drops n elements of the input, then echoes the rest. fun drop(n: Long ): Stream <O> |
dropLast | fun dropLast(n: Int ): Stream <O> Outputs all but the last n elements of the input.fun dropLast(n: Long ): Stream <O> |
dropThrough | Like dropWhile, but drops the first value which tests false.fun dropThrough(p: (O) -> Boolean ): Stream <O> |
dropWhile | Drops elements from the head of this stream until the supplied predicate returns false.fun dropWhile(p: (O) -> Boolean ): Stream <O> |
effectMap | Alias for flatMap { o -> Stream.effect { f(o) } } .fun <B> effectMap(f: suspend (O) -> B): Stream <B> |
effectMapAccumulate | Like mapAccumulate, but accepts a function returning an suspend fun.fun <S, O2> effectMapAccumulate(s: S, f: suspend (S, O) -> Pair <S, O2>): Stream < Pair <S, O2>> |
effectScan | Like scan, but accepts a suspending function.fun <O2> effectScan(z: O2, f: suspend (O2, O) -> O2): Stream <O2> |
effectTap | Alias for effectMap { o -> f(o); o } . Useful if you want to attach log functionfun effectTap(f: suspend (O) -> Unit ): Stream <O> |
either | Merges both Streams into an Stream of A and B represented by Either<A, B>. This operation is equivalent to a normal merge but for different types.fun <B> either(ctx: CoroutineContext = Dispatchers.Default, other: Stream <B>): Stream <Either<O, B>> |
exists | Emits true as soon as a matching element is received, else false if no input matches.fun exists(f: (O) -> Boolean ): Stream < Boolean > |
filter | Emits only inputs which match the supplied predicate.fun filter(p: (O) -> Boolean ): Stream <O> |
filterWithPrevious | Like filter , but the predicate f depends on the previously emitted and current elements.fun filterWithPrevious(f: (O, O) -> Boolean ): Stream <O> |
find | alias for firstfun find(f: (O) -> Boolean ): Stream <O> |
first | Emits the first element of the stream for which the predicate is defined, or is empty.fun first(f: (O) -> Boolean ): Stream <O> |
flatMap | Creates a stream whose elements are generated by applying f to each output of the source stream and concatenated all of the results.fun <B> flatMap(f: (O) -> Stream <B>): Stream <B> |
fold | Folds all inputs using an initial value z and supplied binary operator, and emits a single element stream.fun <B> fold(initial: B, f: (B, O) -> B): Stream <B> |
foldMap | Alias for map(f).foldMonoid .fun <O2> foldMap(MO2: Monoid<O2>, f: (O) -> O2): Stream <O2> |
forall | Emits false and halts as soon as a non-matching element is received; or emits a single true value if it reaches the stream end and every input before that matches the predicate; or hangs without emitting values if the input is infinite and all inputs match the predicate.fun forall(p: (O) -> Boolean ): Stream < Boolean > |
interruptAfter | Interrupts this stream after the specified duration has passed.fun interruptAfter(duration: Duration ): Stream <O> |
interruptScope | Creates a scope that may be interrupted by calling scope#interrupt.fun interruptScope(): Stream <O> |
interruptWhen | Interrupts the stream, when haltOnSignal finishes its evaluation.fun interruptWhen(haltOnSignal: suspend () -> Either< Throwable , Unit >): Stream <O> Alias for interruptWhen(haltWhenTrue.discrete) .fun interruptWhen(haltWhenTrue: Signal < Boolean >): Stream <O> Let through the s2 branch as long as the s1 branch is false , listening asynchronously for the left branch to become true . This halts as soon as either branch halts.fun interruptWhen(haltWhenTrue: Stream < Boolean >): Stream <O> |
map | Applies the specified pure function to each input and emits the result.fun <B> map(f: (O) -> B): Stream <B> |
mapAccumulate | Maps a running total according to S and the input with the function f .fun <S, O2> mapAccumulate(init: S, f: (S, O) -> Pair <S, O2>): Stream < Pair <S, O2>> |
mapChunks | Applies the specified pure function to each chunk in this stream.fun <B> mapChunks(f: ( Chunk <O>) -> Chunk <B>): Stream <B> |
mapFilter | fun <O2> mapFilter(p: (O) -> O2?): Stream <O2> |
mapNotNull | fun <B> mapNotNull(p: (O) -> B?): Stream <B> |
mask | Behaves like the identity function but halts the stream on an error and does not return the error.fun mask(): Stream <O> |
onFinalize | Run the supplied effectful action at the end of this stream, regardless of how the stream terminates.fun onFinalize(f: suspend () -> Unit ): Stream <O> |
onFinalizeCase | Like onFinalize but provides the reason for finalization as an ExitCase .fun onFinalizeCase(f: suspend ( ExitCase ) -> Unit ): Stream <O> |
repeat | Repeat this stream an infinite number of times. s.repeat() == s.append { s.append { s.append { ... } } fun repeat(): Stream <O> |
repeatN | Repeat this stream a given number of times.fun repeatN(n: Long ): Stream <O> |
scope | Introduces an explicit scope.fun scope(): Stream <O> |
spawn | Starts this stream and cancels it as finalization of the returned stream.fun spawn(ctx: CoroutineContext = Dispatchers.Default): Stream < Fiber < Unit >> |
tail | Emits all elements of the input except the first one.fun tail(): Stream <O> |
take | Emits the first n elements of this stream. Alias for take.fun take(n: Int ): Stream <O> Emits the first n elements of this stream. fun take(n: Long ): Stream <O> |
takeLast | Emits the last n elements of the input.fun takeLast(n: Int ): Stream <O> |
takeLastOrNull | Emits the last n elements of the input.fun takeLastOrNull(n: Int ): Stream <O?> |
takeThrough | Like takeWhile, but emits the first value which tests false.fun takeThrough(p: (O) -> Boolean ): Stream <O> |
takeWhile | Emits the longest prefix of the input for which all elements test true according to f.fun takeWhile(p: (O) -> Boolean ): Stream <O> |
through | Transforms this stream using the given Pipe .fun <B> through(pipe: Pipe <O, B>): Stream <B> Transforms this stream and s2 using the given Pipe2 .fun <O2, B> through(s2: Stream <O2>, pipe2: Pipe2 <O, O2, B>): Stream <B> |
timeout | Fails this stream with a TimeoutException if it does not complete within given timeout .fun timeout(timeout: Duration ): Stream <O> |
toString | fun toString(): String |
unchunk | Converts the input to a stream of 1-element chunks.fun unchunk(): Stream <O> |
void | Removes all output values from this stream.fun void(): Stream < Nothing > |
zip | Deterministically zips elements, terminating when the end of either branch is reached naturally.fun <B> zip(that: Stream <B>): Stream < Pair <O, B>> |
zipLeft | Like zip , but selects the left values only. Useful with timed streams, the example below will emit a number every 100 milliseconds.fun <B> zipLeft(that: Stream <B>): Stream <O> |
zipRight | Like zip , but selects the right values only. Useful with timed streams, the example below will emit a number every 100 milliseconds.fun <B> zipRight(that: Stream <B>): Stream <B> |
zipWith | Deterministically zips elements using the specified function, terminating when the end of either branch is reached naturally.fun <B, C> zipWith(other: Stream <B>, f: (O, B) -> C): Stream <C> |
zipWithIndex | Zips the elements of the input stream with its indices, and returns the new stream.fun zipWithIndex(): Stream < Pair <O, Long >> |
zipWithNext | Zips each element of this stream with the next element wrapped into Some . The last element is zipped with None .fun zipWithNext(): Stream < Pair <O, O?>> |
zipWithPrevious | Zips each element of this stream with the previous element wrapped into Some . The first element is zipped with None .fun zipWithPrevious(): Stream < Pair <O?, O>> |
zipWithPreviousAndNext | Zips each element of this stream with its previous and next element wrapped into Some . The first element is zipped with None as the previous element, the last element is zipped with None as the next element.fun zipWithPreviousAndNext(): Stream < Triple <O?, O, O?>> |
zipWithScan | Zips the input with a running total according to S , up to but not including the current element. Thus the initial z value is the first emitted to the output:fun <B> zipWithScan(z: B, f: (B, O) -> B): Stream < Pair <O, B>> |
zipWithScan1 | Zips the input with a running total according to S , including the current element. Thus the initial z value is the first emitted to the output:fun <B> zipWithScan1(z: B, f: (B, O) -> B): Stream < Pair <O, B>> |
getScope | Gets the current scope, allowing manual leasing or interruption. This is a low-level method and generally should not be used by user code.val getScope: Stream < Scope > |
unit | val unit: Stream < Unit > |
bracket | Creates a stream that emits a resource allocated by an effect, ensuring the resource is eventually released regardless of how the stream is used.fun <R> bracket(acquire: suspend () -> R, release: suspend (R) -> Unit ): Stream <R> |
bracketCase | Like bracket but the release action is passed an ExitCase.fun <R> bracketCase(acquire: suspend () -> R, release: suspend (R, ExitCase ) -> Unit ): Stream <R> |
chunk | Lifts a Chunk into a Streamfun <A> chunk(ch: Chunk <A>): Stream <A> |
constant | Creates an infinite pure stream that always returns the supplied value.fun <O> constant(o: O, chunkSize: Int = 256): Stream <O> |
defer | Returns a stream that evaluates s each time the stream is used, allowing use of a mutable value in stream computations.fun <O> defer(s: () -> Stream <O>): Stream <O> |
effect | Creates a single element stream that gets its value by evaluating the supplied effect. If the effect fails, the returned stream fails.fun <O> effect(fo: suspend () -> O): Stream <O> |
effect_ | Creates a stream that evaluates the supplied fa for its effect, discarding the output value. As a result, the returned stream emits no elements and hence has output type INothing .fun effect_(fa: suspend () -> Unit ): Stream < Nothing > |
effectUnChunk | like effect but resulting chunk is flatten efficientlyfun <O> effectUnChunk(fo: suspend () -> Chunk <O>): Stream <O> |
emits | Creates a pure stream that emits the supplied values.fun <A> emits(vararg aas: A): Stream <A> |
empty | Empty Stream.fun <O> empty(): Stream <O> |
force | Lifts an effect that generates a stream in to a stream. Alias for effect(f).flatMap(_) .fun <A> force(f: suspend () -> Stream <A>): Stream <A> |
invoke | Creates a pure stream that emits the supplied values. Alias for emits.operator fun <A> invoke(vararg aas: A): Stream <A> |
iterable | Like emits , but works for any class that extends Iterable fun <O> iterable(os: Iterable <O>): Stream <O> |
iterate | An infinite Stream that repeatedly applies a given function to a start value. initial is the first value emitted, followed by f(initial) , then f(f(initial)) , and so on.fun <A> iterate(initial: A, f: (A) -> A): Stream <A> |
iterateEffect | Like iterate, but takes an suspend function.fun <A> iterateEffect(start: A, f: suspend (A) -> A): Stream <A> |
just | Creates a singleton stream that emits the supplied value.fun <O> just(o: O): Stream <O> |
never | Creates a Stream that never returns any valuesfun <A> never(): Stream <A> |
raiseError | Creates a Stream that, when run, fails with the supplied exception err.fun <O> raiseError(err: Throwable ): Stream <O> |
random | Creates a random stream of integers using a random seed.fun random(seed: Int ): Stream < Int > |
range | Lazily produce the range [start, stopExclusive) . If you want to produce the sequence in one chunk, instead of lazily, use emits(start until stopExclusive) .fun range(range: IntProgression ): Stream < Int > fun range(range: LongProgression ): Stream < Long > fun range(range: CharProgression ): Stream < Char > |
resource | Converts the supplied resource in to a singleton stream. Gives the same guarantees as using bracketCase.fun <O> resource(r: Resource <O>): Stream <O> |
sleep | A single-element Stream that waits for the duration d before emitting unit.fun sleep(d: Duration ): Stream < Unit > |
sleep_ | Alias for sleep(d).void . Often used in conjunction with append (i.e., sleep_(..).append { s } ) as a more performant version of sleep(..).flatMap { s } .fun sleep_(d: Duration ): Stream < Nothing > |
supervise | Starts the supplied task and cancels it as finalization of the returned stream.fun <A> supervise(ctx: CoroutineContext = Dispatchers.Default, fa: suspend () -> A): Stream < Fiber <A>> |
unfold | Creates a stream by successively applying f until a null is returned, emitting each output O and using each output S as input to the next invocation of f.fun <S, O> unfold(s: S, f: (S) -> Pair <O, S>?): Stream <O> |
unfoldChunk | Like unfold but each invocation of f provides a chunk of output.fun <S, O> unfoldChunk(s: S, f: (S) -> Pair < Chunk <O>, S>?): Stream <O> |
unfoldChunkEffect | Like unfoldChunk, but takes an effectful function.fun <S, O> unfoldChunkEffect(s: S, f: suspend (S) -> Pair < Chunk <O>, S>?): Stream <O> |
unfoldEffect | Like unfold, but takes an suspend function.fun <S, O> unfoldEffect(s: S, f: suspend (S) -> Pair <O, S>?): Stream <O> |
append | Lazily appends s2 to the end of this stream.fun <O> Stream <O>.append(s2: () -> Stream <O>): Stream <O> |
asResource | Opens DSL to consume Stream as a Resource.fun <O> Stream <O>.asResource(): ResourceOps <O> |
cons | Prepends a Chunk onto the front of this stream.fun <O> Stream <O>.cons(c: Chunk <O>): Stream <O> |
cons1 | Prepends a value onto the front of this stream.fun <O> Stream <O>.cons1(o: O): Stream <O> |
drain | Runs all the effects of this Stream and ignores all emitted values.suspend fun <O> Stream <O>.drain(): Unit |
filterNotNull | Filters any null .fun <O : Any > Stream <O?>.filterNotNull(): Stream <O> |
filterOption | Filters any arrow.core.None.fun <O> Stream <Option<O>>.filterOption(): Stream <O> |
firstOrError | Runs the first effect of this Stream, raising a NoSuchElementException if the stream emitted no values and returns the value if emitted.suspend fun <O> Stream <O>.firstOrError(): O |
firstOrNull | Runs the first effect of this Stream, and returns null if the stream emitted a value and returns the value if emitted.suspend fun <O> Stream <O>.firstOrNull(): O? |
flatten | Flattens a stream of streams in to a single stream by concatenating each stream.fun <O> Stream < Stream <O>>.flatten(): Stream <O> |
fold1 | Folds all inputs using the supplied operator f, and emits a single-element stream, or the empty stream if the input is empty, or the never stream if the input is non-terminating.fun <O> Stream <O>.fold1(f: (O, O) -> O): Stream <O> |
foldChunks | Folds all the effects of this stream in to a value by folding the output chunks together, starting with the provided init and combining the current value with each output chunk using fsuspend fun <O, B> Stream <O>.foldChunks(init: B, f: (B, Chunk <O>) -> B): B |
foldMonoid | Folds this stream with the monoid for O .fun <O> Stream <O>.foldMonoid(MO: Monoid<O>): Stream <O> |
handleErrorWith | If this Stream terminates with Stream.raiseError, invoke [h](../handle-error-with.html#arrow.fx.coroutines.stream$handleErrorWith(arrow.fx.coroutines.stream.Stream((arrow.fx.coroutines.stream.handleErrorWith.O)), kotlin.Function1((kotlin.Throwable, arrow.fx.coroutines.stream.Stream((arrow.fx.coroutines.stream.handleErrorWith.O)))))/h) and continue with result. fun |
interleave | Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.fun <O> Stream <O>.interleave(that: Stream <O>): Stream <O> |
interleaveAll | Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.fun <O> Stream <O>.interleaveAll(that: Stream <O>): Stream <O> |
intersperse | Emits the specified separator between every pair of elements in the source stream.fun <O> Stream <O>.intersperse(separator: O): Stream <O> |
lastOrError | Runs all the effects of this Stream, raising a NoSuchElementException if the stream emitted no values, and returning the last value emitted otherwise.suspend fun <O> Stream <O>.lastOrError(): O |
lastOrNull | Runs all the effects of this Stream, and returns null if the stream emitted no values and returning the last value emitted if values were emitted.suspend fun <O> Stream <O>.lastOrNull(): O? |
noneTerminate | Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion. Note that this doesn’t actually limit an infinite stream.fun <O> Stream <O>.noneTerminate(): Stream <Option<O>> |
onComplete | Run s2 after this , regardless of errors during this , then reraise any errors encountered during this .fun <O> Stream <O>.onComplete(s2: () -> Stream <O>): Stream <O> |
parJoin | Non-deterministically merges a stream of streams (outer ) in to a single stream, opening at most maxOpen streams at any point in time.fun <O> Stream < Stream <O>>.~~parJoin~~(maxOpen: Int , ctx: CoroutineContext = Dispatchers.Default): Stream <O> |
parJoinUnbounded | Like parJoin but races all inner streams simultaneously without limit.fun <O> Stream < Stream <O>>.parJoinUnbounded(ctx: CoroutineContext = Dispatchers.Default): Stream <O> |
reduce | fun <O> Stream <O>.reduce(f: (O, O) -> O): Stream <O> |
reduceSemigroup | Reduces this stream with the Semigroup for O .fun <O> Stream <O>.reduceSemigroup(S: Semigroup<O>): Stream <O> |
repartition | Repartitions the input with the function f . On each step f is applied to the input and all elements but the last of the resulting sequence are emitted. The last element is then appended to the next input using the Semigroup S .fun <O> Stream <O>.repartition(S: Semigroup<O>, f: (O) -> Chunk <O>): Stream <O> |
scan | Left fold which outputs all intermediate results.fun <O, O2> Stream <O>.scan(init: O2, f: (O2, O) -> O2): Stream <O2> |
scan1 | Like [scan] , but uses the first element of the stream as the seed.fun <O> Stream <O>.scan1(f: (O, O) -> O): Stream <O> |
scanChunks | Like scan but f is applied to each chunk of the source stream. The resulting chunk is emitted and the result of the chunk is used in the next invocation of f .fun <O, S, O2> Stream <O>.scanChunks(init: S, f: (S, Chunk <O>) -> Pair <S, Chunk <O2>>): Stream <O2> |
scanChunksOpt | More general version of scanChunks where the current state (i.e., S ) can be inspected to determine if another chunk should be pulled or if the stream should terminate. Termination is signaled by returning null from f . Otherwise, a function which consumes the next chunk.fun <O, S, O2> Stream <O>.scanChunksOpt(init: S, f: (S) -> (( Chunk <O>) -> Pair <S, Chunk <O2>>)?): Stream <O2> |
scanMap | Alias for map(f).scanMonoid .fun <O, O2> Stream <O>.scanMap(MO2: Monoid<O2>, f: (O) -> O2): Stream <O2> |
scanMonoid | Folds this stream with the monoid for O while emitting all intermediate results.fun <O> Stream <O>.scanMonoid(MO: Monoid<O>): Stream <O> |
terminateOn | Halts the input stream when the condition is true.fun <O> Stream <O>.terminateOn(terminator: (O) -> Boolean ): Stream <O> |
terminateOnNone | Halts the input stream at the first arrow.core.None.fun <O> Stream <Option<O>>.terminateOnNone(): Stream <O> |
terminateOnNull | Halts the input stream at the first null .fun <O> Stream <O>.terminateOnNull(): Stream <O> |
toList | Runs all the effects of this Stream and collects all emitted values into a List. If the Stream doesn’t emit any values it returns emptyList.suspend fun <O> Stream <O>.toList(): List <O> |
toSet | Runs all the effects of this Stream and collects all emitted values into a Set. If the Stream doesn’t emit any values it returns emptySet.suspend fun <O> Stream <O>.toSet(): Set <O> |
zipAll | Deterministically zips elements, terminating when the ends of both branches are reached naturally, padding the left branch with pad1 and padding the right branch with pad2 as necessary.fun <O, B> Stream <O>.zipAll(that: Stream <B>, pad1: O, pad2: B): Stream < Pair <O, B>> |
zipAllWith | Deterministically zips elements with the specified function, terminating when the ends of both branches are reached naturally, padding the left branch with pad1 and padding the right branch with pad2 as necessary.fun <A, B, C> Stream <A>.zipAllWith(that: Stream <B>, pad1: A, pad2: B, f: (A, B) -> C): Stream <C> |
callback | Creates a Stream from the given suspended block callback, allowing to emit, set cancel effects and end the emission.fun <A> Stream.Companion.callback(f: suspend EmitterSyntax <A>.() -> Unit ): Stream <A> |
cancellable | Creates a cancellable Stream from the given suspended block that will evaluate the passed CancelToken if cancelled.fun <A> Stream.Companion.cancellable(f: suspend EmitterSyntax <A>.() -> CancelToken ): Stream <A> |
monoid | Monoid instance for Stream .fun <O> Stream.Companion.monoid(): Monoid< Stream <O>> |
Do you like Arrow?
✖