Skip to content

Add asynchronous version of Flux::scan #4026

@ummels

Description

@ummels

Motivation

We have tried implementing a reactive state machine, i.e. a function that takes a Flux<Input> and returns a Flux<State>. The state transition function is naturally reactive, i.e. it is of type (State, Input) -> Mono<State>.

Our first try has looked like this:

Flux<State> process(Flux<Input> inputs, State initialState, BiFunction<State, Input, Mono<State>> f) {
    return inputs.scan(Mono.just(initialState), (mono, input) -> mono.map(s -> f.apply(s, input)))
        .concatMap(Function::identity)
}

However, this leads to a stack overflow.

Desired solution

Add a method scanAsync to Flux that takes a Supplier<A> and a BiFunction<A, ? super T, Publisher<A>> and returns a Flux<A>.

Considered alternatives

In a similar issue , a solution using AtomicReference has been proposed, but this doesn't feel idiomatic and might have issues with concurrency and resubscription. In another related issue, using Flux.expand has been proposed, but it is not clear whether this would help in our scenario.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions