Skip to content

Commit f294938

Browse files
authored
2.x: dedicated reduce() op implementations (ReactiveX#4885)
* 2.x: dedicated reduce() op implementations * Fix unnecessary duplicate assignment, remove trailing spaces * Check for more terminal state
1 parent baa00f7 commit f294938

12 files changed

+616
-7
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ dependencies {
2929
testCompile 'junit:junit:4.12'
3030
testCompile 'org.mockito:mockito-core:2.1.0'
3131

32-
perfCompile 'org.openjdk.jmh:jmh-core:1.13'
33-
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
32+
perfCompile 'org.openjdk.jmh:jmh-core:1.16'
33+
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.16'
3434

3535
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0'
3636
testCompile group: 'org.testng', name: 'testng', version: '6.9.10'

src/main/java/io/reactivex/Flowable.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -10516,7 +10516,9 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
1051610516
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1051710517
@SchedulerSupport(SchedulerSupport.NONE)
1051810518
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
10519-
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<R>(scan(seed, reducer).takeLast(1), null)); // TODO dedicated operator
10519+
ObjectHelper.requireNonNull(seed, "seed is null");
10520+
ObjectHelper.requireNonNull(reducer, "reducer is null");
10521+
return RxJavaPlugins.onAssembly(new FlowableReduceSeedSingle<T, R>(this, seed, reducer));
1052010522
}
1052110523

1052210524
/**
@@ -10567,7 +10569,9 @@ public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
1056710569
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1056810570
@SchedulerSupport(SchedulerSupport.NONE)
1056910571
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
10570-
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<R>(scanWith(seedSupplier, reducer).takeLast(1), null)); // TODO dedicated operator
10572+
ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null");
10573+
ObjectHelper.requireNonNull(reducer, "reducer is null");
10574+
return RxJavaPlugins.onAssembly(new FlowableReduceWithSingle<T, R>(this, seedSupplier, reducer));
1057110575
}
1057210576

1057310577
/**

src/main/java/io/reactivex/Observable.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -8685,7 +8685,8 @@ public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends
86858685
@CheckReturnValue
86868686
@SchedulerSupport(SchedulerSupport.NONE)
86878687
public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
8688-
return scan(reducer).takeLast(1).singleElement();
8688+
ObjectHelper.requireNonNull(reducer, "reducer is null");
8689+
return RxJavaPlugins.onAssembly(new ObservableReduceMaybe<T>(this, reducer));
86898690
}
86908691

86918692
/**
@@ -8732,7 +8733,9 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
87328733
@CheckReturnValue
87338734
@SchedulerSupport(SchedulerSupport.NONE)
87348735
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
8735-
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<R>(scan(seed, reducer).takeLast(1), null));
8736+
ObjectHelper.requireNonNull(seed, "seed is null");
8737+
ObjectHelper.requireNonNull(reducer, "reducer is null");
8738+
return RxJavaPlugins.onAssembly(new ObservableReduceSeedSingle<T, R>(this, seed, reducer));
87368739
}
87378740

87388741
/**
@@ -8779,7 +8782,9 @@ public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
87798782
@CheckReturnValue
87808783
@SchedulerSupport(SchedulerSupport.NONE)
87818784
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
8782-
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<R>(scanWith(seedSupplier, reducer).takeLast(1), null));
8785+
ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null");
8786+
ObjectHelper.requireNonNull(reducer, "reducer is null");
8787+
return RxJavaPlugins.onAssembly(new ObservableReduceWithSingle<T, R>(this, seedSupplier, reducer));
87838788
}
87848789

87858790
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.BiFunction;
22+
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Reduce a sequence of values, starting from a seed value and by using
28+
* an accumulator function and return the last accumulated value.
29+
*
30+
* @param <T> the source value type
31+
* @param <R> the accumulated result type
32+
*/
33+
public final class FlowableReduceSeedSingle<T, R> extends Single<R> {
34+
35+
final Publisher<T> source;
36+
37+
final R seed;
38+
39+
final BiFunction<R, ? super T, R> reducer;
40+
41+
public FlowableReduceSeedSingle(Publisher<T> source, R seed, BiFunction<R, ? super T, R> reducer) {
42+
this.source = source;
43+
this.seed = seed;
44+
this.reducer = reducer;
45+
}
46+
47+
@Override
48+
protected void subscribeActual(SingleObserver<? super R> observer) {
49+
source.subscribe(new ReduceSeedObserver<T, R>(observer, reducer, seed));
50+
}
51+
52+
static final class ReduceSeedObserver<T, R> implements Subscriber<T>, Disposable {
53+
54+
final SingleObserver<? super R> actual;
55+
56+
final BiFunction<R, ? super T, R> reducer;
57+
58+
R value;
59+
60+
Subscription s;
61+
62+
public ReduceSeedObserver(SingleObserver<? super R> actual, BiFunction<R, ? super T, R> reducer, R value) {
63+
this.actual = actual;
64+
this.value = value;
65+
this.reducer = reducer;
66+
}
67+
68+
@Override
69+
public void onSubscribe(Subscription s) {
70+
if (SubscriptionHelper.validate(this.s, s)) {
71+
this.s = s;
72+
73+
actual.onSubscribe(this);
74+
75+
s.request(Long.MAX_VALUE);
76+
}
77+
}
78+
79+
@Override
80+
public void onNext(T value) {
81+
R v = this.value;
82+
if (v != null) {
83+
try {
84+
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
85+
} catch (Throwable ex) {
86+
Exceptions.throwIfFatal(ex);
87+
s.cancel();
88+
onError(ex);
89+
}
90+
}
91+
}
92+
93+
@Override
94+
public void onError(Throwable e) {
95+
R v = value;
96+
value = null;
97+
if (v != null) {
98+
s = SubscriptionHelper.CANCELLED;
99+
actual.onError(e);
100+
} else {
101+
RxJavaPlugins.onError(e);
102+
}
103+
}
104+
105+
@Override
106+
public void onComplete() {
107+
R v = value;
108+
value = null;
109+
if (v != null) {
110+
s = SubscriptionHelper.CANCELLED;
111+
actual.onSuccess(v);
112+
}
113+
}
114+
115+
@Override
116+
public void dispose() {
117+
s.cancel();
118+
s = SubscriptionHelper.CANCELLED;
119+
}
120+
121+
@Override
122+
public boolean isDisposed() {
123+
return s == SubscriptionHelper.CANCELLED;
124+
}
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.Callable;
17+
18+
import org.reactivestreams.Publisher;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.BiFunction;
23+
import io.reactivex.internal.disposables.EmptyDisposable;
24+
import io.reactivex.internal.functions.ObjectHelper;
25+
import io.reactivex.internal.operators.flowable.FlowableReduceSeedSingle.ReduceSeedObserver;
26+
27+
/**
28+
* Reduce a sequence of values, starting from a generated seed value and by using
29+
* an accumulator function and return the last accumulated value.
30+
*
31+
* @param <T> the source value type
32+
* @param <R> the accumulated result type
33+
*/
34+
public final class FlowableReduceWithSingle<T, R> extends Single<R> {
35+
36+
final Publisher<T> source;
37+
38+
final Callable<R> seedSupplier;
39+
40+
final BiFunction<R, ? super T, R> reducer;
41+
42+
public FlowableReduceWithSingle(Publisher<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
43+
this.source = source;
44+
this.seedSupplier = seedSupplier;
45+
this.reducer = reducer;
46+
}
47+
48+
@Override
49+
protected void subscribeActual(SingleObserver<? super R> observer) {
50+
R seed;
51+
52+
try {
53+
seed = ObjectHelper.requireNonNull(seedSupplier.call(), "The seedSupplier returned a null value");
54+
} catch (Throwable ex) {
55+
Exceptions.throwIfFatal(ex);
56+
EmptyDisposable.error(ex, observer);
57+
return;
58+
}
59+
source.subscribe(new ReduceSeedObserver<T, R>(observer, reducer, seed));
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.exceptions.Exceptions;
19+
import io.reactivex.functions.BiFunction;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.functions.ObjectHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
24+
/**
25+
* Reduce a sequence of values into a single value via an aggregator function and emit the final value or complete
26+
* if the source is empty.
27+
*
28+
* @param <T> the source and result value type
29+
*/
30+
public final class ObservableReduceMaybe<T> extends Maybe<T> {
31+
32+
final ObservableSource<T> source;
33+
34+
final BiFunction<T, T, T> reducer;
35+
36+
public ObservableReduceMaybe(ObservableSource<T> source, BiFunction<T, T, T> reducer) {
37+
this.source = source;
38+
this.reducer = reducer;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(MaybeObserver<? super T> observer) {
43+
source.subscribe(new ReduceObserver<T>(observer, reducer));
44+
}
45+
46+
static final class ReduceObserver<T> implements Observer<T>, Disposable {
47+
48+
final MaybeObserver<? super T> actual;
49+
50+
final BiFunction<T, T, T> reducer;
51+
52+
boolean done;
53+
54+
T value;
55+
56+
Disposable d;
57+
58+
public ReduceObserver(MaybeObserver<? super T> observer, BiFunction<T, T, T> reducer) {
59+
this.actual = observer;
60+
this.reducer = reducer;
61+
}
62+
63+
@Override
64+
public void onSubscribe(Disposable d) {
65+
if (DisposableHelper.validate(this.d, d)) {
66+
this.d = d;
67+
68+
actual.onSubscribe(this);
69+
}
70+
}
71+
72+
@Override
73+
public void onNext(T value) {
74+
if (!done) {
75+
T v = this.value;
76+
77+
if (v == null) {
78+
this.value = value;
79+
} else {
80+
try {
81+
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
82+
} catch (Throwable ex) {
83+
Exceptions.throwIfFatal(ex);
84+
d.dispose();
85+
onError(ex);
86+
}
87+
}
88+
}
89+
}
90+
91+
@Override
92+
public void onError(Throwable e) {
93+
if (done) {
94+
RxJavaPlugins.onError(e);
95+
return;
96+
}
97+
done = true;
98+
value = null;
99+
actual.onError(e);
100+
}
101+
102+
@Override
103+
public void onComplete() {
104+
if (done) {
105+
return;
106+
}
107+
done = true;
108+
T v = value;
109+
value = null;
110+
if (v != null) {
111+
actual.onSuccess(v);
112+
} else {
113+
actual.onComplete();
114+
}
115+
}
116+
117+
@Override
118+
public void dispose() {
119+
d.dispose();
120+
}
121+
122+
@Override
123+
public boolean isDisposed() {
124+
return d.isDisposed();
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)