Skip to content

Commit

Permalink
3.x: Fix Schedulers.from to honor interruptibleWorker across methods (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 5, 2021
1 parent 2591862 commit 26dddf5
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ abstract class AbstractDirectTask

protected final Runnable runnable;

protected final boolean interruptOnCancel;

protected Thread runner;

protected static final FutureTask<Void> FINISHED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);

protected static final FutureTask<Void> DISPOSED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);

AbstractDirectTask(Runnable runnable) {
AbstractDirectTask(Runnable runnable, boolean interruptOnCancel) {
this.runnable = runnable;
this.interruptOnCancel = interruptOnCancel;
}

@Override
Expand All @@ -48,7 +51,7 @@ public final void dispose() {
if (f != FINISHED && f != DISPOSED) {
if (compareAndSet(f, DISPOSED)) {
if (f != null) {
f.cancel(runner != Thread.currentThread());
cancelFuture(f);
}
}
}
Expand All @@ -67,7 +70,7 @@ public final void setFuture(Future<?> future) {
break;
}
if (f == DISPOSED) {
future.cancel(runner != Thread.currentThread());
cancelFuture(future);
break;
}
if (compareAndSet(f, future)) {
Expand All @@ -76,6 +79,14 @@ public final void setFuture(Future<?> future) {
}
}

private void cancelFuture(Future<?> future) {
if (runner == Thread.currentThread()) {
future.cancel(false);
} else {
future.cancel(interruptOnCancel);
}
}

@Override
public Runnable getWrappedRunnable() {
return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
if (executor instanceof ExecutorService) {
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ExecutorService)executor).submit(task);
task.setFuture(f);
return task;
Expand All @@ -85,7 +85,7 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ScheduledExecutorService)executor).schedule(task, delay, unit);
task.setFuture(f);
return task;
Expand All @@ -110,7 +110,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
if (executor instanceof ScheduledExecutorService) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true);
try {
Future<?> f;
if (delayTime <= 0L) {
Expand Down Expand Up @@ -104,7 +104,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implem

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectPeriodicTask(Runnable runnable) {
super(runnable);
public ScheduledDirectPeriodicTask(Runnable runnable, boolean interruptOnCancel) {
super(runnable, interruptOnCancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public final class ScheduledDirectTask extends AbstractDirectTask implements Cal

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectTask(Runnable runnable) {
super(runnable);
public ScheduledDirectTask(Runnable runnable, boolean interruptOnCancel) {
super(runnable, interruptOnCancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Worker createWorker() {
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true);
try {
Future<?> f;
if (delay <= 0L) {
Expand Down Expand Up @@ -146,7 +146,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true);
try {
Future<?> f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void dispose() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Void> to = Completable.timer(1, TimeUnit.MILLISECONDS, s)
.doOnComplete(new Action() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void timerDelayZero() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestSubscriber<Long> ts = Flowable.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void dispose() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Maybe.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void timerDelayZero() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Observable.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void disposed() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Single.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AbstractDirectTaskTest extends RxJavaTest {

@Test
public void cancelSetFuture() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -58,7 +58,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void cancelSetFutureCurrentThread() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -91,7 +91,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void setFutureCancel() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -119,7 +119,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void setFutureCancelSameThread() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -148,7 +148,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void finished() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -177,7 +177,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void finishedCancel() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -211,7 +211,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
@Test
public void disposeSetFutureRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};

Expand Down Expand Up @@ -246,7 +246,7 @@ static class TestDirectTask extends AbstractDirectTask {
private static final long serialVersionUID = 587679821055711738L;

TestDirectTask() {
super(Functions.EMPTY_RUNNABLE);
super(Functions.EMPTY_RUNNABLE, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void runnableThrows() {
public void run() {
throw new TestException();
}
});
}, true);

try {
task.run();
Expand Down
Loading

0 comments on commit 26dddf5

Please sign in to comment.