Skip to content

Commit 5f27280

Browse files
committed
Port and adapt code from virtual-threads
1 parent a581001 commit 5f27280

29 files changed

+1291
-335
lines changed

src/main/asciidoc/index.adoc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Vert.x core provides functionality for things like:
1616
* Datagram Sockets
1717
* DNS client
1818
* File system access
19+
* Virtual threads
1920
* High availability
2021
* Native transports
2122
* Clustering
@@ -411,6 +412,21 @@ If you want to deploy a verticle as a worker verticle you do that with {@link io
411412
Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by
412413
different threads at different times.
413414

415+
=== Virtual thread verticles
416+
417+
A virtual thread verticle is just like a standard verticle but it's executed using virtual threads, rather than using an event loop.
418+
419+
Virtual thread verticles are designed to use an async/await model with Vert.x futures.
420+
421+
If you want to deploy a verticle as a <<virtual_threads, virtual thread verticle>> you do that with {@link io.vertx.core.DeploymentOptions#setThreadingModel}.
422+
423+
[source,$lang]
424+
----
425+
{@link examples.CoreExamples#example7_1}
426+
----
427+
428+
NOTE: this feature requires Java 21
429+
414430
=== Deploying verticles programmatically
415431

416432
You can deploy a verticle using one of the {@link io.vertx.core.Vertx#deployVerticle} method, specifying a verticle
@@ -679,6 +695,9 @@ include::datagrams.adoc[]
679695

680696
include::dns.adoc[]
681697

698+
[[virtual_threads]]
699+
include::virtualthreads.adoc[]
700+
682701
[[streams]]
683702
include::streams.adoc[]
684703

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
== Vert.x Virtual Threads
2+
3+
Use virtual threads to write Vert.x code that looks like it is synchronous.
4+
5+
You still write the traditional Vert.x code processing events, but you have the opportunity to write synchronous code for complex workflows and use thread locals in such workflows.
6+
7+
=== Introduction
8+
9+
The non-blocking nature of Vert.x leads to asynchronous APIs.
10+
Asynchronous APIs can take various forms including callback style, promises and reactive extensions.
11+
12+
In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do sequentially.
13+
Also, error propagation is often more complex when using asynchronous APIs.
14+
15+
Virtual thread support allows you to work with asynchronous APIs, but using a direct synchronous style that you're already familiar with.
16+
17+
It does this by using Java 21 https://openjdk.org/jeps/444[virtual threads]. Virtual threads are very lightweight threads that do not correspond to underlying kernel threads. When they are blocked they do not block a kernel thread.
18+
19+
=== Using virtual threads
20+
21+
You can deploy virtual thread verticles.
22+
23+
A virtual thread verticle is capable of awaiting Vert.x futures and gets the result synchronously.
24+
25+
When the verticle *awaits* a result, the verticle can still process events like an event-loop verticle.
26+
27+
[source,java]
28+
----
29+
{@link examples.VirtualThreadExamples#gettingStarted}
30+
----
31+
32+
NOTE: Using virtual threads requires Java 21 or above.
33+
34+
==== Blocking within a virtual thread verticle
35+
36+
You can use {@link io.vertx.core.Future#await} to suspend the current virtual thread until the awaited result is available.
37+
38+
The virtual thread is effectively blocked, but the application can still process events.
39+
40+
When a virtual thread awaits for a result and the verticle needs to process a task, a new virtual thread is created to handle this task.
41+
42+
When the result is available, the virtual thread execution is resumed and scheduled after the current task is suspended or finished.
43+
44+
IMPORTANT: Like any verticle at most one task is executed at the same time.
45+
46+
You can await on a Vert.x `Future`
47+
48+
[source,java]
49+
----
50+
{@link examples.VirtualThreadExamples#awaitingFutures1}
51+
----
52+
53+
or on a JDK `CompletionStage`
54+
55+
[source,java]
56+
----
57+
{@link examples.VirtualThreadExamples#awaitingFutures2}
58+
----
59+
60+
==== Field visibility
61+
62+
A virtual thread verticle can interact safely with fields before an `await` call. However, if you are reading a field before an `await` call and reusing the value after the call, you should keep in mind that this value might have changed.
63+
64+
[source,java]
65+
----
66+
{@link examples.VirtualThreadExamples#fieldVisibility1}
67+
----
68+
69+
You should read/write fields before calling `await` to avoid this.
70+
71+
[source,java]
72+
----
73+
{@link examples.VirtualThreadExamples#fieldVisibility2}
74+
----
75+
76+
NOTE: this is the same behavior with an event-loop verticle
77+
78+
==== Awaiting multiple futures
79+
80+
When you need to await multiple futures, you can use Vert.x {@link io.vertx.core.CompositeFuture}:
81+
82+
[source,java]
83+
----
84+
{@link examples.VirtualThreadExamples#awaitingMultipleFutures}
85+
----
86+
87+
==== Blocking without await
88+
89+
When your application blocks without using `await`, e.g. using `ReentrantLock#lock`, the Vert.x scheduler is not aware of it and cannot schedule events on the verticle: it behaves like a worker verticle, yet using virtual threads.
90+
91+
This use case is not encouraged yet not forbidden, however the verticle should be deployed with several instances to deliver the desired concurrency, like a worker verticle.
92+
93+
==== Thread local support
94+
95+
Thread locals are only reliable within the execution of a context task.
96+
97+
[source,java]
98+
----
99+
{@link examples.VirtualThreadExamples#threadLocalSupport1}
100+
----

src/main/generated/io/vertx/core/DeploymentOptionsConverter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Deploym
2525
obj.setConfig(((JsonObject)member.getValue()).copy());
2626
}
2727
break;
28+
case "threadingModel":
29+
if (member.getValue() instanceof String) {
30+
obj.setThreadingModel(io.vertx.core.ThreadingModel.valueOf((String)member.getValue()));
31+
}
32+
break;
2833
case "worker":
2934
if (member.getValue() instanceof Boolean) {
3035
obj.setWorker((Boolean)member.getValue());
@@ -72,6 +77,9 @@ static void toJson(DeploymentOptions obj, java.util.Map<String, Object> json) {
7277
if (obj.getConfig() != null) {
7378
json.put("config", obj.getConfig());
7479
}
80+
if (obj.getThreadingModel() != null) {
81+
json.put("threadingModel", obj.getThreadingModel().name());
82+
}
7583
json.put("worker", obj.isWorker());
7684
json.put("ha", obj.isHa());
7785
json.put("instances", obj.getInstances());

src/main/generated/io/vertx/core/WorkerPoolOptionsConverter.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/main/java/examples/CoreExamples.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ public void example7_1(Vertx vertx) {
222222
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
223223
}
224224

225+
public void example7_2(Vertx vertx) {
226+
DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD);
227+
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
228+
}
229+
225230
public void example8(Vertx vertx) {
226231

227232
Verticle myVerticle = new MyVerticle();
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package examples;
12+
13+
import io.vertx.core.*;
14+
import io.vertx.core.buffer.Buffer;
15+
import io.vertx.core.http.*;
16+
import io.vertx.docgen.Source;
17+
18+
import java.util.concurrent.CompletionStage;
19+
20+
@Source
21+
public class VirtualThreadExamples {
22+
23+
public void gettingStarted(Vertx vertx) {
24+
25+
AbstractVerticle verticle = new AbstractVerticle() {
26+
@Override
27+
public void start() {
28+
HttpClient client = vertx.createHttpClient();
29+
HttpClientRequest req = Future.await(client.request(
30+
HttpMethod.GET,
31+
8080,
32+
"localhost",
33+
"/"));
34+
HttpClientResponse resp = Future.await(req.send());
35+
int status = resp.statusCode();
36+
Buffer body = Future.await(resp.body());
37+
}
38+
};
39+
40+
// Run the verticle a on virtual thread
41+
vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
42+
}
43+
44+
private int counter;
45+
46+
public void fieldVisibility1() {
47+
int value = counter;
48+
value += Future.await(getRemoteValue());
49+
// the counter value might have changed
50+
counter = value;
51+
}
52+
53+
public void fieldVisibility2() {
54+
counter += Future.await(getRemoteValue());
55+
}
56+
57+
private Future<Buffer> callRemoteService() {
58+
return null;
59+
}
60+
61+
private Future<Integer> getRemoteValue() {
62+
return null;
63+
}
64+
65+
public void deployVerticle(Vertx vertx, int port) {
66+
vertx.deployVerticle(() -> new AbstractVerticle() {
67+
HttpServer server;
68+
@Override
69+
public void start() {
70+
server = vertx
71+
.createHttpServer()
72+
.requestHandler(req -> {
73+
Buffer res;
74+
try {
75+
res = Future.await(callRemoteService());
76+
} catch (Exception e) {
77+
req.response().setStatusCode(500).end();
78+
return;
79+
}
80+
req.response().end(res);
81+
});
82+
Future.await(server.listen(port));
83+
}
84+
}, new DeploymentOptions()
85+
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
86+
}
87+
88+
public void awaitingFutures1(HttpClientResponse response) {
89+
Buffer body = Future.await(response.body());
90+
}
91+
92+
public void awaitingFutures2(HttpClientResponse response, CompletionStage<Buffer> completionStage) {
93+
Buffer body = Future.await(Future.fromCompletionStage(completionStage));
94+
}
95+
96+
private Future<String> getRemoteString() {
97+
return null;
98+
}
99+
100+
public void awaitingMultipleFutures() {
101+
Future<String> f1 = getRemoteString();
102+
Future<Integer> f2 = getRemoteValue();
103+
CompositeFuture res = Future.await(Future.all(f1, f2));
104+
String v1 = res.resultAt(0);
105+
Integer v2 = res.resultAt(1);
106+
}
107+
108+
public void threadLocalSupport1(String userId, HttpClient client) {
109+
ThreadLocal<String> local = new ThreadLocal();
110+
local.set(userId);
111+
HttpClientRequest req = Future.await(client.request(HttpMethod.GET, 8080, "localhost", "/"));
112+
HttpClientResponse resp = Future.await(req.send());
113+
// Thread local remains the same since it's the same virtual thread
114+
}
115+
}

src/main/java/io/vertx/core/Context.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ default List<String> processArgs() {
185185
*/
186186
boolean isWorkerContext();
187187

188+
/**
189+
* @return the context threading model
190+
*/
191+
ThreadingModel threadingModel();
192+
188193
/**
189194
* Get some data from the context.
190195
*

0 commit comments

Comments
 (0)