Skip to content

Commit 8429dc7

Browse files
committedNov 28, 2016
2.x: fix firstOrError back conversions not signalling NSE
1 parent b238cc8 commit 8429dc7

File tree

10 files changed

+154
-14
lines changed

10 files changed

+154
-14
lines changed
 

‎src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.NoSuchElementException;
17+
1618
import org.reactivestreams.*;
1719

1820
import io.reactivex.internal.subscriptions.*;
@@ -21,15 +23,18 @@
2123
public final class FlowableElementAt<T> extends AbstractFlowableWithUpstream<T, T> {
2224
final long index;
2325
final T defaultValue;
24-
public FlowableElementAt(Publisher<T> source, long index, T defaultValue) {
26+
final boolean errorOnFewer;
27+
28+
public FlowableElementAt(Publisher<T> source, long index, T defaultValue, boolean errorOnFewer) {
2529
super(source);
2630
this.index = index;
2731
this.defaultValue = defaultValue;
32+
this.errorOnFewer = errorOnFewer;
2833
}
2934

3035
@Override
3136
protected void subscribeActual(Subscriber<? super T> s) {
32-
source.subscribe(new ElementAtSubscriber<T>(s, index, defaultValue));
37+
source.subscribe(new ElementAtSubscriber<T>(s, index, defaultValue, errorOnFewer));
3338
}
3439

3540
static final class ElementAtSubscriber<T> extends DeferredScalarSubscription<T> implements Subscriber<T> {
@@ -38,17 +43,19 @@ static final class ElementAtSubscriber<T> extends DeferredScalarSubscription<T>
3843

3944
final long index;
4045
final T defaultValue;
46+
final boolean errorOnFewer;
4147

4248
Subscription s;
4349

4450
long count;
4551

4652
boolean done;
4753

48-
ElementAtSubscriber(Subscriber<? super T> actual, long index, T defaultValue) {
54+
ElementAtSubscriber(Subscriber<? super T> actual, long index, T defaultValue, boolean errorOnFewer) {
4955
super(actual);
5056
this.index = index;
5157
this.defaultValue = defaultValue;
58+
this.errorOnFewer = errorOnFewer;
5259
}
5360

5461
@Override
@@ -91,7 +98,11 @@ public void onComplete() {
9198
done = true;
9299
T v = defaultValue;
93100
if (v == null) {
94-
actual.onComplete();
101+
if (errorOnFewer) {
102+
actual.onError(new NoSuchElementException());
103+
} else {
104+
actual.onComplete();
105+
}
95106
} else {
96107
complete(v);
97108
}

‎src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ protected void subscribeActual(MaybeObserver<? super T> s) {
3838

3939
@Override
4040
public Flowable<T> fuseToFlowable() {
41-
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, null));
41+
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, null, false));
4242
}
4343

4444
static final class ElementAtSubscriber<T> implements Subscriber<T>, Disposable {

‎src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected void subscribeActual(SingleObserver<? super T> s) {
4242

4343
@Override
4444
public Flowable<T> fuseToFlowable() {
45-
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, defaultValue));
45+
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, defaultValue, true));
4646
}
4747

4848
static final class ElementAtSubscriber<T> implements Subscriber<T>, Disposable {

‎src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16+
import java.util.NoSuchElementException;
17+
1618
import io.reactivex.*;
1719
import io.reactivex.disposables.Disposable;
1820
import io.reactivex.internal.disposables.DisposableHelper;
@@ -21,31 +23,36 @@
2123
public final class ObservableElementAt<T> extends AbstractObservableWithUpstream<T, T> {
2224
final long index;
2325
final T defaultValue;
24-
public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue) {
26+
final boolean errorOnFewer;
27+
28+
public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue, boolean errorOnFewer) {
2529
super(source);
2630
this.index = index;
2731
this.defaultValue = defaultValue;
32+
this.errorOnFewer = errorOnFewer;
2833
}
2934
@Override
3035
public void subscribeActual(Observer<? super T> t) {
31-
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
36+
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue, errorOnFewer));
3237
}
3338

3439
static final class ElementAtObserver<T> implements Observer<T>, Disposable {
3540
final Observer<? super T> actual;
3641
final long index;
3742
final T defaultValue;
43+
final boolean errorOnFewer;
3844

3945
Disposable s;
4046

4147
long count;
4248

4349
boolean done;
4450

45-
ElementAtObserver(Observer<? super T> actual, long index, T defaultValue) {
51+
ElementAtObserver(Observer<? super T> actual, long index, T defaultValue, boolean errorOnFewer) {
4652
this.actual = actual;
4753
this.index = index;
4854
this.defaultValue = defaultValue;
55+
this.errorOnFewer = errorOnFewer;
4956
}
5057

5158
@Override
@@ -99,10 +106,14 @@ public void onComplete() {
99106
if (!done) {
100107
done = true;
101108
T v = defaultValue;
102-
if (v != null) {
103-
actual.onNext(v);
109+
if (v == null && errorOnFewer) {
110+
actual.onError(new NoSuchElementException());
111+
} else {
112+
if (v != null) {
113+
actual.onNext(v);
114+
}
115+
actual.onComplete();
104116
}
105-
actual.onComplete();
106117
}
107118
}
108119
}

‎src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void subscribeActual(MaybeObserver<? super T> t) {
3333

3434
@Override
3535
public Observable<T> fuseToObservable() {
36-
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null));
36+
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null, false));
3737
}
3838

3939
static final class ElementAtObserver<T> implements Observer<T>, Disposable {

‎src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void subscribeActual(SingleObserver<? super T> t) {
3939

4040
@Override
4141
public Observable<T> fuseToObservable() {
42-
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, defaultValue));
42+
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, defaultValue, true));
4343
}
4444

4545
static final class ElementAtObserver<T> implements Observer<T>, Disposable {

‎src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -544,4 +544,45 @@ public void firstOrErrorError() {
544544
.assertErrorMessage("error")
545545
.assertError(RuntimeException.class);
546546
}
547+
548+
@Test
549+
public void firstOrErrorNoElementFlowable() {
550+
Flowable.empty()
551+
.firstOrError()
552+
.toFlowable()
553+
.test()
554+
.assertNoValues()
555+
.assertError(NoSuchElementException.class);
556+
}
557+
558+
@Test
559+
public void firstOrErrorOneElementFlowable() {
560+
Flowable.just(1)
561+
.firstOrError()
562+
.toFlowable()
563+
.test()
564+
.assertNoErrors()
565+
.assertValue(1);
566+
}
567+
568+
@Test
569+
public void firstOrErrorMultipleElementsFlowable() {
570+
Flowable.just(1, 2, 3)
571+
.firstOrError()
572+
.toFlowable()
573+
.test()
574+
.assertNoErrors()
575+
.assertValue(1);
576+
}
577+
578+
@Test
579+
public void firstOrErrorErrorFlowable() {
580+
Flowable.error(new RuntimeException("error"))
581+
.firstOrError()
582+
.toFlowable()
583+
.test()
584+
.assertNoValues()
585+
.assertErrorMessage("error")
586+
.assertError(RuntimeException.class);
587+
}
547588
}

‎src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -357,4 +357,22 @@ public void error() {
357357
.test()
358358
.assertFailure(TestException.class);
359359
}
360+
361+
@Test
362+
public void errorLastOrErrorFlowable() {
363+
Flowable.error(new TestException())
364+
.lastOrError()
365+
.toFlowable()
366+
.test()
367+
.assertFailure(TestException.class);
368+
}
369+
370+
@Test
371+
public void emptyLastOrErrorFlowable() {
372+
Flowable.empty()
373+
.lastOrError()
374+
.toFlowable()
375+
.test()
376+
.assertFailure(NoSuchElementException.class);
377+
}
360378
}

‎src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -541,4 +541,45 @@ public void firstOrErrorError() {
541541
.assertErrorMessage("error")
542542
.assertError(RuntimeException.class);
543543
}
544+
545+
@Test
546+
public void firstOrErrorNoElementObservable() {
547+
Observable.empty()
548+
.firstOrError()
549+
.toObservable()
550+
.test()
551+
.assertNoValues()
552+
.assertError(NoSuchElementException.class);
553+
}
554+
555+
@Test
556+
public void firstOrErrorOneElementObservable() {
557+
Observable.just(1)
558+
.firstOrError()
559+
.toObservable()
560+
.test()
561+
.assertNoErrors()
562+
.assertValue(1);
563+
}
564+
565+
@Test
566+
public void firstOrErrorMultipleElementsObservable() {
567+
Observable.just(1, 2, 3)
568+
.firstOrError()
569+
.toObservable()
570+
.test()
571+
.assertNoErrors()
572+
.assertValue(1);
573+
}
574+
575+
@Test
576+
public void firstOrErrorErrorObservable() {
577+
Observable.error(new RuntimeException("error"))
578+
.firstOrError()
579+
.toObservable()
580+
.test()
581+
.assertNoValues()
582+
.assertErrorMessage("error")
583+
.assertError(RuntimeException.class);
584+
}
544585
}

‎src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -358,4 +358,22 @@ public void error() {
358358
.test()
359359
.assertFailure(TestException.class);
360360
}
361+
362+
@Test
363+
public void errorLastOrErrorObservable() {
364+
Observable.error(new TestException())
365+
.lastOrError()
366+
.toObservable()
367+
.test()
368+
.assertFailure(TestException.class);
369+
}
370+
371+
@Test
372+
public void emptyLastOrErrorObservable() {
373+
Observable.empty()
374+
.lastOrError()
375+
.toObservable()
376+
.test()
377+
.assertFailure(NoSuchElementException.class);
378+
}
361379
}

0 commit comments

Comments
 (0)