Skip to content

Commit fd3d42b

Browse files
authored
Merge pull request #4915 from eclipse-vertx/backport-worker-improvements
Backport worker improvements
2 parents aaa87aa + 3af79e3 commit fd3d42b

File tree

6 files changed

+192
-63
lines changed

6 files changed

+192
-63
lines changed

src/main/java/io/vertx/core/impl/TaskQueue.java

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
import io.vertx.core.impl.logging.LoggerFactory;
1616

1717
import java.util.LinkedList;
18+
import java.util.concurrent.CountDownLatch;
1819
import java.util.concurrent.Executor;
1920
import java.util.concurrent.RejectedExecutionException;
20-
import java.util.function.Consumer;
21+
import java.util.concurrent.TimeUnit;
2122

2223
/**
2324
* A task queue that always run all tasks in order. The executor to run the tasks is passed
@@ -82,17 +83,12 @@ private void run() {
8283
}
8384

8485
/**
85-
* Unschedule the current task from execution, the next task in the queue will be executed
86-
* when there is one.
86+
* Return a controller for the current task.
8787
*
88-
* <p>When the current task wants to be resumed, it should call the returned consumer with a command
89-
* to unpark the thread (e.g most likely yielding a latch), this task will be executed immediately if there
90-
* is no tasks being executed, otherwise it will be added first in the queue.
91-
*
92-
* @return a mean to signal to resume the thread when it shall be resumed
88+
* @return the controller
9389
* @throws IllegalStateException if the current thread is not currently being executed by the queue
9490
*/
95-
public Consumer<Runnable> unschedule() {
91+
public WorkerExecutor.TaskController current() {
9692
Thread thread;
9793
Executor executor;
9894
synchronized (tasks) {
@@ -101,20 +97,42 @@ public Consumer<Runnable> unschedule() {
10197
}
10298
thread = currentThread;
10399
executor = currentExecutor;
104-
currentThread = null;
105100
}
106-
executor.execute(runner);
107-
return r -> {
108-
synchronized (tasks) {
109-
if (currentExecutor != null) {
110-
tasks.addFirst(new ResumeTask(r, executor, thread));
111-
return;
112-
} else {
101+
return new WorkerExecutor.TaskController() {
102+
103+
final CountDownLatch latch = new CountDownLatch(1);
104+
105+
@Override
106+
public void resume(Runnable callback) {
107+
Runnable task = () -> {
108+
callback.run();
109+
latch.countDown();
110+
};
111+
synchronized (tasks) {
112+
if (currentExecutor != null) {
113+
tasks.addFirst(new ResumeTask(task, executor, thread));
114+
return;
115+
}
113116
currentExecutor = executor;
114117
currentThread = thread;
115118
}
119+
task.run();
120+
}
121+
122+
@Override
123+
public CountDownLatch suspend() {
124+
if (Thread.currentThread() != thread) {
125+
throw new IllegalStateException();
126+
}
127+
synchronized (tasks) {
128+
if (currentThread == null || currentThread != Thread.currentThread()) {
129+
throw new IllegalStateException();
130+
}
131+
currentThread = null;
132+
}
133+
executor.execute(runner);
134+
return latch;
116135
}
117-
r.run();
118136
};
119137
}
120138

src/main/java/io/vertx/core/impl/VertxImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -535,11 +535,20 @@ public ContextImpl createEventLoopContext() {
535535
return createEventLoopContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader());
536536
}
537537

538-
@Override
539-
public ContextImpl createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
538+
private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
540539
TaskQueue orderedTasks = new TaskQueue();
541540
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
542-
return new ContextImpl(this, false, eventLoopGroup.next(), new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
541+
return new ContextImpl(this, false, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
542+
}
543+
544+
@Override
545+
public ContextInternal createWorkerContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) {
546+
return createWorkerContext(eventLoop, closeFuture, workerPool, null, tccl);
547+
}
548+
549+
@Override
550+
public ContextImpl createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
551+
return createWorkerContext(eventLoopGroup.next(), closeFuture, workerPool, deployment, tccl);
543552
}
544553

545554
@Override

src/main/java/io/vertx/core/impl/VertxInternal.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,29 @@ default <C> C createSharedClient(String clientKey, String clientName, CloseFutur
118118
*/
119119
ContextInternal createEventLoopContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl);
120120

121+
/**
122+
* @return event loop context
123+
*/
121124
ContextInternal createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl);
122125

126+
/**
127+
* @return event loop context
128+
*/
123129
ContextInternal createEventLoopContext();
124130

125131
/**
126-
* @return worker loop context
132+
* @return worker context
127133
*/
128-
ContextInternal createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool pool, ClassLoader tccl);
134+
ContextInternal createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl);
129135

136+
/**
137+
* @return worker context
138+
*/
139+
ContextInternal createWorkerContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl);
140+
141+
/**
142+
* @return worker context
143+
*/
130144
ContextInternal createWorkerContext();
131145

132146
@Override

src/main/java/io/vertx/core/impl/VertxWrapper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,8 +454,13 @@ public ContextInternal createEventLoopContext() {
454454
}
455455

456456
@Override
457-
public ContextInternal createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool pool, ClassLoader tccl) {
458-
return delegate.createWorkerContext(deployment, closeFuture, pool, tccl);
457+
public ContextInternal createWorkerContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) {
458+
return delegate.createWorkerContext(eventLoop, workerPool, tccl);
459+
}
460+
461+
@Override
462+
public ContextInternal createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
463+
return delegate.createWorkerContext(deployment, closeFuture, workerPool, tccl);
459464
}
460465

461466
@Override

src/main/java/io/vertx/core/impl/WorkerExecutor.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
import io.vertx.core.spi.metrics.PoolMetrics;
1414

15-
import java.util.function.Consumer;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
1617

1718
/**
1819
* Execute events on a worker pool.
@@ -60,9 +61,46 @@ public void execute(Runnable command) {
6061
}
6162

6263
/**
63-
* See {@link TaskQueue#unschedule()}.
64+
* See {@link TaskQueue#current()}.
6465
*/
65-
public Consumer<Runnable> unschedule() {
66-
return orderedTasks.unschedule();
66+
public TaskController current() {
67+
return orderedTasks.current();
68+
}
69+
70+
public interface TaskController {
71+
72+
/**
73+
* Resume the task, the {@code callback} will be executed when the task is resumed, before the task thread
74+
* is unparked.
75+
*
76+
* @param callback called when the task is resumed
77+
*/
78+
void resume(Runnable callback);
79+
80+
/**
81+
* Like {@link #resume(Runnable)}.
82+
*/
83+
default void resume() {
84+
resume(() -> {});
85+
}
86+
87+
/**
88+
* Suspend the task execution and park the current thread until the task is resumed.
89+
* The next task in the queue will be executed, when there is one.
90+
*
91+
* <p>When the task wants to be resumed, it should call {@link #resume}, this will be executed immediately if there
92+
* is no other tasks being executed, otherwise it will be added first in the queue.
93+
*/
94+
default void suspendAndAwaitResume() throws InterruptedException {
95+
suspend().await();
96+
}
97+
98+
/**
99+
* Like {@link #suspendAndAwaitResume()} but does not await the task to be resumed.
100+
*
101+
* @return the latch to await
102+
*/
103+
CountDownLatch suspend();
104+
67105
}
68106
}

0 commit comments

Comments
 (0)