Skip to content

Commit

Permalink
Port and adapt code from virtual-threads
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 25, 2023
1 parent 998ef49 commit f150fb1
Show file tree
Hide file tree
Showing 30 changed files with 1,342 additions and 368 deletions.
19 changes: 19 additions & 0 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Vert.x core provides functionality for things like:
* Datagram Sockets
* DNS client
* File system access
* Virtual threads
* High availability
* Native transports
* Clustering
Expand Down Expand Up @@ -268,6 +269,21 @@ If you want to deploy a verticle as a worker verticle you do that with {@link io
Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by
different threads at different times.

=== Virtual thread verticles

A virtual thread verticle is just like a standard verticle but it's executed using virtual threads, rather than using an event loop.

Virtual thread verticles are designed to use an async/await model with Vert.x futures.

If you want to deploy a verticle as a <<virtual_threads, virtual thread verticle>> you do that with {@link io.vertx.core.DeploymentOptions#setThreadingModel}.

[source,$lang]
----
{@link examples.CoreExamples#example7_1}
----

NOTE: this feature requires Java 21

=== Deploying verticles programmatically

You can deploy a verticle using one of the {@link io.vertx.core.Vertx#deployVerticle} method, specifying a verticle
Expand Down Expand Up @@ -568,6 +584,9 @@ include::datagrams.adoc[]

include::dns.adoc[]

[[virtual_threads]]
include::virtualthreads.adoc[]

[[streams]]
include::streams.adoc[]

Expand Down
100 changes: 100 additions & 0 deletions src/main/asciidoc/virtualthreads.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
== Vert.x Virtual Threads

Use virtual threads to write Vert.x code that looks like it is synchronous.

You still write the traditional Vert.x code processing events, but you have the opportunity to write synchronous code for complex workflows and use thread locals in such workflows.

=== Introduction

The non-blocking nature of Vert.x leads to asynchronous APIs.
Asynchronous APIs can take various forms including callback style, promises and reactive extensions.

In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do sequentially.
Also, error propagation is often more complex when using asynchronous APIs.

Virtual thread support allows you to work with asynchronous APIs, but using a direct synchronous style that you're already familiar with.

It does this by using Java 21 https://openjdk.org/jeps/444[virtual threads]. Virtual threads are very lightweight threads that do not correspond to underlying kernel threads. When they are blocked they do not block a kernel thread.

=== Using virtual threads

You can deploy virtual thread verticles.

A virtual thread verticle is capable of awaiting Vert.x futures and gets the result synchronously.

When the verticle *awaits* a result, the verticle can still process events like an event-loop verticle.

[source,java]
----
{@link examples.VirtualThreadExamples#gettingStarted}
----

NOTE: Using virtual threads requires Java 21 or above.

==== Blocking within a virtual thread verticle

You can use {@link io.vertx.core.Future#await} to suspend the current virtual thread until the awaited result is available.

The virtual thread is effectively blocked, but the application can still process events.

When a virtual thread awaits for a result and the verticle needs to process a task, a new virtual thread is created to handle this task.

When the result is available, the virtual thread execution is resumed and scheduled after the current task is suspended or finished.

IMPORTANT: Like any verticle at most one task is executed at the same time.

You can await on a Vert.x `Future`

[source,java]
----
{@link examples.VirtualThreadExamples#awaitingFutures1}
----

or on a JDK `CompletionStage`

[source,java]
----
{@link examples.VirtualThreadExamples#awaitingFutures2}
----

==== Field visibility

A virtual thread verticle can interact safely with fields before an `await` call. However, if you are reading a field before an `await` call and reusing the value after the call, you should keep in mind that this value might have changed.

[source,java]
----
{@link examples.VirtualThreadExamples#fieldVisibility1}
----

You should read/write fields before calling `await` to avoid this.

[source,java]
----
{@link examples.VirtualThreadExamples#fieldVisibility2}
----

NOTE: this is the same behavior with an event-loop verticle

==== Awaiting multiple futures

When you need to await multiple futures, you can use Vert.x {@link io.vertx.core.CompositeFuture}:

[source,java]
----
{@link examples.VirtualThreadExamples#awaitingMultipleFutures}
----

==== Blocking without await

When your application blocks without using `await`, e.g. using `ReentrantLock#lock`, the Vert.x scheduler is not aware of it and cannot schedule events on the verticle: it behaves like a worker verticle, yet using virtual threads.

This use case is not encouraged yet not forbidden, however the verticle should be deployed with several instances to deliver the desired concurrency, like a worker verticle.

==== Thread local support

Thread locals are only reliable within the execution of a context task.

[source,java]
----
{@link examples.VirtualThreadExamples#threadLocalSupport1}
----
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Deploym
obj.setMaxWorkerExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "threadingModel":
if (member.getValue() instanceof String) {
obj.setThreadingModel(io.vertx.core.ThreadingModel.valueOf((String)member.getValue()));
}
break;
case "worker":
if (member.getValue() instanceof Boolean) {
obj.setWorker((Boolean)member.getValue());
Expand Down Expand Up @@ -78,6 +83,9 @@ static void toJson(DeploymentOptions obj, java.util.Map<String, Object> json) {
if (obj.getMaxWorkerExecuteTimeUnit() != null) {
json.put("maxWorkerExecuteTimeUnit", obj.getMaxWorkerExecuteTimeUnit().name());
}
if (obj.getThreadingModel() != null) {
json.put("threadingModel", obj.getThreadingModel().name());
}
json.put("worker", obj.isWorker());
if (obj.getWorkerPoolName() != null) {
json.put("workerPoolName", obj.getWorkerPoolName());
Expand Down
61 changes: 0 additions & 61 deletions src/main/generated/io/vertx/core/WorkerPoolOptionsConverter.java

This file was deleted.

5 changes: 5 additions & 0 deletions src/main/java/examples/CoreExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public void example7_1(Vertx vertx) {
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
}

public void example7_2(Vertx vertx) {
DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
}

public void example8(Vertx vertx) {

Verticle myVerticle = new MyVerticle();
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/examples/VirtualThreadExamples.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package examples;

import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.docgen.Source;

import java.util.concurrent.CompletionStage;

@Source
public class VirtualThreadExamples {

public void gettingStarted(Vertx vertx) {

AbstractVerticle verticle = new AbstractVerticle() {
@Override
public void start() {
HttpClient client = vertx.createHttpClient();
HttpClientRequest req = Future.await(client.request(
HttpMethod.GET,
8080,
"localhost",
"/"));
HttpClientResponse resp = Future.await(req.send());
int status = resp.statusCode();
Buffer body = Future.await(resp.body());
}
};

// Run the verticle a on virtual thread
vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
}

private int counter;

public void fieldVisibility1() {
int value = counter;
value += Future.await(getRemoteValue());
// the counter value might have changed
counter = value;
}

public void fieldVisibility2() {
counter += Future.await(getRemoteValue());
}

private Future<Buffer> callRemoteService() {
return null;
}

private Future<Integer> getRemoteValue() {
return null;
}

public void deployVerticle(Vertx vertx, int port) {
vertx.deployVerticle(() -> new AbstractVerticle() {
HttpServer server;
@Override
public void start() {
server = vertx
.createHttpServer()
.requestHandler(req -> {
Buffer res;
try {
res = Future.await(callRemoteService());
} catch (Exception e) {
req.response().setStatusCode(500).end();
return;
}
req.response().end(res);
});
Future.await(server.listen(port));
}
}, new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
}

public void awaitingFutures1(HttpClientResponse response) {
Buffer body = Future.await(response.body());
}

public void awaitingFutures2(HttpClientResponse response, CompletionStage<Buffer> completionStage) {
Buffer body = Future.await(Future.fromCompletionStage(completionStage));
}

private Future<String> getRemoteString() {
return null;
}

public void awaitingMultipleFutures() {
Future<String> f1 = getRemoteString();
Future<Integer> f2 = getRemoteValue();
CompositeFuture res = Future.await(Future.all(f1, f2));
String v1 = res.resultAt(0);
Integer v2 = res.resultAt(1);
}

public void threadLocalSupport1(String userId, HttpClient client) {
ThreadLocal<String> local = new ThreadLocal();
local.set(userId);
HttpClientRequest req = Future.await(client.request(HttpMethod.GET, 8080, "localhost", "/"));
HttpClientResponse resp = Future.await(req.send());
// Thread local remains the same since it's the same virtual thread
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ default List<String> processArgs() {
*/
boolean isWorkerContext();

/**
* @return the context threading model
*/
ThreadingModel threadingModel();

/**
* Get some data from the context.
*
Expand Down
Loading

0 comments on commit f150fb1

Please sign in to comment.