diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index 82d4241870e..c8e1031f001 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -16,6 +16,7 @@ Vert.x core provides functionality for things like: * Datagram Sockets * DNS client * File system access +* Virtual threads * High availability * Native transports * Clustering @@ -268,6 +269,21 @@ If you want to deploy a verticle as a worker verticle you do that with {@link io Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times. +=== Virtual thread verticles + +A virtual thread verticle is just like a standard verticle but it's executed using virtual threads, rather than using an event loop. + +Virtual thread verticles are designed to use an async/await model with Vert.x futures. + +If you want to deploy a verticle as a <> you do that with {@link io.vertx.core.DeploymentOptions#setThreadingModel}. + +[source,$lang] +---- +{@link examples.CoreExamples#example7_1} +---- + +NOTE: this feature requires Java 21 + === Deploying verticles programmatically You can deploy a verticle using one of the {@link io.vertx.core.Vertx#deployVerticle} method, specifying a verticle @@ -568,6 +584,9 @@ include::datagrams.adoc[] include::dns.adoc[] +[[virtual_threads]] +include::virtualthreads.adoc[] + [[streams]] include::streams.adoc[] diff --git a/src/main/asciidoc/virtualthreads.adoc b/src/main/asciidoc/virtualthreads.adoc new file mode 100644 index 00000000000..a5a07137ed7 --- /dev/null +++ b/src/main/asciidoc/virtualthreads.adoc @@ -0,0 +1,100 @@ +== Vert.x Virtual Threads + +Use virtual threads to write Vert.x code that looks like it is synchronous. + +You still write the traditional Vert.x code processing events, but you have the opportunity to write synchronous code for complex workflows and use thread locals in such workflows. + +=== Introduction + +The non-blocking nature of Vert.x leads to asynchronous APIs. +Asynchronous APIs can take various forms including callback style, promises and reactive extensions. + +In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do sequentially. +Also, error propagation is often more complex when using asynchronous APIs. + +Virtual thread support allows you to work with asynchronous APIs, but using a direct synchronous style that you're already familiar with. + +It does this by using Java 21 https://openjdk.org/jeps/444[virtual threads]. Virtual threads are very lightweight threads that do not correspond to underlying kernel threads. When they are blocked they do not block a kernel thread. + +=== Using virtual threads + +You can deploy virtual thread verticles. + +A virtual thread verticle is capable of awaiting Vert.x futures and gets the result synchronously. + +When the verticle *awaits* a result, the verticle can still process events like an event-loop verticle. + +[source,java] +---- +{@link examples.VirtualThreadExamples#gettingStarted} +---- + +NOTE: Using virtual threads requires Java 21 or above. + +==== Blocking within a virtual thread verticle + +You can use {@link io.vertx.core.Future#await} to suspend the current virtual thread until the awaited result is available. + +The virtual thread is effectively blocked, but the application can still process events. + +When a virtual thread awaits for a result and the verticle needs to process a task, a new virtual thread is created to handle this task. + +When the result is available, the virtual thread execution is resumed and scheduled after the current task is suspended or finished. + +IMPORTANT: Like any verticle at most one task is executed at the same time. + +You can await on a Vert.x `Future` + +[source,java] +---- +{@link examples.VirtualThreadExamples#awaitingFutures1} +---- + +or on a JDK `CompletionStage` + +[source,java] +---- +{@link examples.VirtualThreadExamples#awaitingFutures2} +---- + +==== Field visibility + +A virtual thread verticle can interact safely with fields before an `await` call. However, if you are reading a field before an `await` call and reusing the value after the call, you should keep in mind that this value might have changed. + +[source,java] +---- +{@link examples.VirtualThreadExamples#fieldVisibility1} +---- + +You should read/write fields before calling `await` to avoid this. + +[source,java] +---- +{@link examples.VirtualThreadExamples#fieldVisibility2} +---- + +NOTE: this is the same behavior with an event-loop verticle + +==== Awaiting multiple futures + +When you need to await multiple futures, you can use Vert.x {@link io.vertx.core.CompositeFuture}: + +[source,java] +---- +{@link examples.VirtualThreadExamples#awaitingMultipleFutures} +---- + +==== Blocking without await + +When your application blocks without using `await`, e.g. using `ReentrantLock#lock`, the Vert.x scheduler is not aware of it and cannot schedule events on the verticle: it behaves like a worker verticle, yet using virtual threads. + +This use case is not encouraged yet not forbidden, however the verticle should be deployed with several instances to deliver the desired concurrency, like a worker verticle. + +==== Thread local support + +Thread locals are only reliable within the execution of a context task. + +[source,java] +---- +{@link examples.VirtualThreadExamples#threadLocalSupport1} +---- diff --git a/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java b/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java index 092b745f2c0..cffa6367263 100644 --- a/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java +++ b/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java @@ -45,6 +45,11 @@ static void fromJson(Iterable> json, Deploym obj.setMaxWorkerExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); } break; + case "threadingModel": + if (member.getValue() instanceof String) { + obj.setThreadingModel(io.vertx.core.ThreadingModel.valueOf((String)member.getValue())); + } + break; case "worker": if (member.getValue() instanceof Boolean) { obj.setWorker((Boolean)member.getValue()); @@ -78,6 +83,9 @@ static void toJson(DeploymentOptions obj, java.util.Map json) { if (obj.getMaxWorkerExecuteTimeUnit() != null) { json.put("maxWorkerExecuteTimeUnit", obj.getMaxWorkerExecuteTimeUnit().name()); } + if (obj.getThreadingModel() != null) { + json.put("threadingModel", obj.getThreadingModel().name()); + } json.put("worker", obj.isWorker()); if (obj.getWorkerPoolName() != null) { json.put("workerPoolName", obj.getWorkerPoolName()); diff --git a/src/main/generated/io/vertx/core/WorkerPoolOptionsConverter.java b/src/main/generated/io/vertx/core/WorkerPoolOptionsConverter.java deleted file mode 100644 index 45de99c2c5c..00000000000 --- a/src/main/generated/io/vertx/core/WorkerPoolOptionsConverter.java +++ /dev/null @@ -1,61 +0,0 @@ -package io.vertx.core; - -import io.vertx.core.json.JsonObject; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.impl.JsonUtil; -import java.time.Instant; -import java.time.format.DateTimeFormatter; -import java.util.Base64; - -/** - * Converter and mapper for {@link io.vertx.core.WorkerPoolOptions}. - * NOTE: This class has been automatically generated from the {@link io.vertx.core.WorkerPoolOptions} original class using Vert.x codegen. - */ -public class WorkerPoolOptionsConverter { - - - private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER; - private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER; - - static void fromJson(Iterable> json, WorkerPoolOptions obj) { - for (java.util.Map.Entry member : json) { - switch (member.getKey()) { - case "maxExecuteTime": - if (member.getValue() instanceof Number) { - obj.setMaxExecuteTime(((Number)member.getValue()).longValue()); - } - break; - case "maxExecuteTimeUnit": - if (member.getValue() instanceof String) { - obj.setMaxExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); - } - break; - case "name": - if (member.getValue() instanceof String) { - obj.setName((String)member.getValue()); - } - break; - case "size": - if (member.getValue() instanceof Number) { - obj.setSize(((Number)member.getValue()).intValue()); - } - break; - } - } - } - - static void toJson(WorkerPoolOptions obj, JsonObject json) { - toJson(obj, json.getMap()); - } - - static void toJson(WorkerPoolOptions obj, java.util.Map json) { - json.put("maxExecuteTime", obj.getMaxExecuteTime()); - if (obj.getMaxExecuteTimeUnit() != null) { - json.put("maxExecuteTimeUnit", obj.getMaxExecuteTimeUnit().name()); - } - if (obj.getName() != null) { - json.put("name", obj.getName()); - } - json.put("size", obj.getSize()); - } -} diff --git a/src/main/java/examples/CoreExamples.java b/src/main/java/examples/CoreExamples.java index 754d1b31f9b..23bd2643c8f 100644 --- a/src/main/java/examples/CoreExamples.java +++ b/src/main/java/examples/CoreExamples.java @@ -224,6 +224,11 @@ public void example7_1(Vertx vertx) { vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options); } + public void example7_2(Vertx vertx) { + DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD); + vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options); + } + public void example8(Vertx vertx) { Verticle myVerticle = new MyVerticle(); diff --git a/src/main/java/examples/VirtualThreadExamples.java b/src/main/java/examples/VirtualThreadExamples.java new file mode 100644 index 00000000000..8fc92849430 --- /dev/null +++ b/src/main/java/examples/VirtualThreadExamples.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package examples; + +import io.vertx.core.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; +import io.vertx.docgen.Source; + +import java.util.concurrent.CompletionStage; + +@Source +public class VirtualThreadExamples { + + public void gettingStarted(Vertx vertx) { + + AbstractVerticle verticle = new AbstractVerticle() { + @Override + public void start() { + HttpClient client = vertx.createHttpClient(); + HttpClientRequest req = Future.await(client.request( + HttpMethod.GET, + 8080, + "localhost", + "/")); + HttpClientResponse resp = Future.await(req.send()); + int status = resp.statusCode(); + Buffer body = Future.await(resp.body()); + } + }; + + // Run the verticle a on virtual thread + vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)); + } + + private int counter; + + public void fieldVisibility1() { + int value = counter; + value += Future.await(getRemoteValue()); + // the counter value might have changed + counter = value; + } + + public void fieldVisibility2() { + counter += Future.await(getRemoteValue()); + } + + private Future callRemoteService() { + return null; + } + + private Future getRemoteValue() { + return null; + } + + public void deployVerticle(Vertx vertx, int port) { + vertx.deployVerticle(() -> new AbstractVerticle() { + HttpServer server; + @Override + public void start() { + server = vertx + .createHttpServer() + .requestHandler(req -> { + Buffer res; + try { + res = Future.await(callRemoteService()); + } catch (Exception e) { + req.response().setStatusCode(500).end(); + return; + } + req.response().end(res); + }); + Future.await(server.listen(port)); + } + }, new DeploymentOptions() + .setThreadingModel(ThreadingModel.VIRTUAL_THREAD)); + } + + public void awaitingFutures1(HttpClientResponse response) { + Buffer body = Future.await(response.body()); + } + + public void awaitingFutures2(HttpClientResponse response, CompletionStage completionStage) { + Buffer body = Future.await(Future.fromCompletionStage(completionStage)); + } + + private Future getRemoteString() { + return null; + } + + public void awaitingMultipleFutures() { + Future f1 = getRemoteString(); + Future f2 = getRemoteValue(); + CompositeFuture res = Future.await(Future.all(f1, f2)); + String v1 = res.resultAt(0); + Integer v2 = res.resultAt(1); + } + + public void threadLocalSupport1(String userId, HttpClient client) { + ThreadLocal local = new ThreadLocal(); + local.set(userId); + HttpClientRequest req = Future.await(client.request(HttpMethod.GET, 8080, "localhost", "/")); + HttpClientResponse resp = Future.await(req.send()); + // Thread local remains the same since it's the same virtual thread + } +} diff --git a/src/main/java/io/vertx/core/Context.java b/src/main/java/io/vertx/core/Context.java index 9116373d145..53e0551dd46 100644 --- a/src/main/java/io/vertx/core/Context.java +++ b/src/main/java/io/vertx/core/Context.java @@ -245,6 +245,11 @@ default List processArgs() { */ boolean isWorkerContext(); + /** + * @return the context threading model + */ + ThreadingModel threadingModel(); + /** * Get some data from the context. * diff --git a/src/main/java/io/vertx/core/DeploymentOptions.java b/src/main/java/io/vertx/core/DeploymentOptions.java index 08ef837fa88..2b15cc70aca 100644 --- a/src/main/java/io/vertx/core/DeploymentOptions.java +++ b/src/main/java/io/vertx/core/DeploymentOptions.java @@ -30,30 +30,36 @@ @DataObject(generateConverter = true, publicConverter = false) public class DeploymentOptions { + public static final ThreadingModel DEFAULT_MODE = ThreadingModel.EVENT_LOOP; public static final boolean DEFAULT_WORKER = false; public static final boolean DEFAULT_HA = false; public static final int DEFAULT_INSTANCES = 1; private JsonObject config; - private boolean worker; - private WorkerOptions workerOptions; + private ThreadingModel threadingModel; private String isolationGroup; private boolean ha; private List extraClasspath; private int instances; private List isolatedClasses; private ClassLoader classLoader; + private String workerPoolName; + private int workerPoolSize; + private long maxWorkerExecuteTime; + private TimeUnit maxWorkerExecuteTimeUnit; /** * Default constructor */ public DeploymentOptions() { - this.worker = DEFAULT_WORKER; + this.threadingModel = DEFAULT_MODE; this.config = null; this.isolationGroup = null; this.ha = DEFAULT_HA; this.instances = DEFAULT_INSTANCES; - this.workerOptions = new WorkerPoolOptions(); + this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE; + this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME; + this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; } /** @@ -63,13 +69,16 @@ public DeploymentOptions() { */ public DeploymentOptions(DeploymentOptions other) { this.config = other.getConfig() == null ? null : other.getConfig().copy(); - this.worker = other.isWorker(); + this.threadingModel = other.getThreadingModel(); this.isolationGroup = other.getIsolationGroup(); this.ha = other.isHa(); this.extraClasspath = other.getExtraClasspath() == null ? null : new ArrayList<>(other.getExtraClasspath()); this.instances = other.instances; this.isolatedClasses = other.getIsolatedClasses() == null ? null : new ArrayList<>(other.getIsolatedClasses()); - this.workerOptions = other.workerOptions.copy(); + this.workerPoolName = other.workerPoolName; + this.workerPoolSize = other.workerPoolSize; + this.maxWorkerExecuteTime = other.maxWorkerExecuteTime; + this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit; } /** @@ -111,23 +120,43 @@ public DeploymentOptions setConfig(JsonObject config) { return this; } + /** + * Which threading model the verticle(s) should use? + * + * @return the verticle threading model + */ + public ThreadingModel getThreadingModel() { + return threadingModel; + } + + /** + * Set the verticle(s) verticle(s) threading model, e.g. a worker or a virtual thread verticle + * + * @param threadingModel the threading model + * @return a reference to this, so the API can be used fluently + */ + public DeploymentOptions setThreadingModel(ThreadingModel threadingModel) { + this.threadingModel = threadingModel; + return this; + } + /** * Should the verticle(s) be deployed as a worker verticle? * - * @return true if will be deployed as worker, false otherwise + * @return {@code true} if will be deployed as worker, {@code false} otherwise */ public boolean isWorker() { - return worker; + return threadingModel == ThreadingModel.WORKER; } /** * Set whether the verticle(s) should be deployed as a worker verticle * - * @param worker true for worker, false otherwise + * @param worker {@code true} for worker, {@code false} force event-loop * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorker(boolean worker) { - this.worker = worker; + this.threadingModel = worker ? ThreadingModel.WORKER : ThreadingModel.EVENT_LOOP; return this; } @@ -259,33 +288,11 @@ public DeploymentOptions setIsolatedClasses(List isolatedClasses) { return this; } - /** - * @return the worker options - */ - public WorkerOptions getWorkerOptions() { - return workerOptions; - } - - /** - * Set the verticle worker options. - * - * @param workerOptions the worker options to use - * @return a reference to this, so the API can be used fluently - */ - public DeploymentOptions setWorkerOptions(WorkerOptions workerOptions) { - this.workerOptions = workerOptions; - return this; - } - - private WorkerPoolOptions assumeWorkerPool() { - return (WorkerPoolOptions) workerOptions; - } - /** * @return the worker pool name */ public String getWorkerPoolName() { - return assumeWorkerPool().getName(); + return workerPoolName; } /** @@ -296,7 +303,7 @@ public String getWorkerPoolName() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorkerPoolName(String workerPoolName) { - assumeWorkerPool().setName(workerPoolName); + this.workerPoolName = workerPoolName; return this; } @@ -311,7 +318,7 @@ public DeploymentOptions setWorkerPoolName(String workerPoolName) { * @return the maximum number of worker threads */ public int getWorkerPoolSize() { - return assumeWorkerPool().getSize(); + return workerPoolSize; } /** @@ -323,7 +330,10 @@ public int getWorkerPoolSize() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorkerPoolSize(int workerPoolSize) { - assumeWorkerPool().setSize(workerPoolSize); + if (workerPoolSize < 1) { + throw new IllegalArgumentException("size must be > 0"); + } + this.workerPoolSize = workerPoolSize; return this; } @@ -340,7 +350,7 @@ public DeploymentOptions setWorkerPoolSize(int workerPoolSize) { * @return The value of max worker execute time, the default value of {@link DeploymentOptions#setMaxWorkerExecuteTimeUnit} {@code maxWorkerExecuteTimeUnit} is {@link TimeUnit#NANOSECONDS} */ public long getMaxWorkerExecuteTime() { - return assumeWorkerPool().getMaxExecuteTime(); + return maxWorkerExecuteTime; } /** @@ -354,7 +364,10 @@ public long getMaxWorkerExecuteTime() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setMaxWorkerExecuteTime(long maxWorkerExecuteTime) { - assumeWorkerPool().setMaxExecuteTime(maxWorkerExecuteTime); + if (maxWorkerExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + this.maxWorkerExecuteTime = maxWorkerExecuteTime; return this; } @@ -364,7 +377,7 @@ public DeploymentOptions setMaxWorkerExecuteTime(long maxWorkerExecuteTime) { * @return the time unit of {@code maxWorkerExecuteTime} */ public TimeUnit getMaxWorkerExecuteTimeUnit() { - return assumeWorkerPool().getMaxExecuteTimeUnit(); + return maxWorkerExecuteTimeUnit; } /** @@ -376,7 +389,7 @@ public TimeUnit getMaxWorkerExecuteTimeUnit() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setMaxWorkerExecuteTimeUnit(TimeUnit maxWorkerExecuteTimeUnit) { - assumeWorkerPool().setMaxExecuteTimeUnit(maxWorkerExecuteTimeUnit); + this.maxWorkerExecuteTimeUnit = maxWorkerExecuteTimeUnit; return this; } diff --git a/src/main/java/io/vertx/core/Future.java b/src/main/java/io/vertx/core/Future.java index 7c0c409cc4b..b6a50f67582 100644 --- a/src/main/java/io/vertx/core/Future.java +++ b/src/main/java/io/vertx/core/Future.java @@ -14,6 +14,7 @@ import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.Utils; import io.vertx.core.impl.future.CompositeFutureImpl; import io.vertx.core.impl.future.FailedFuture; import io.vertx.core.impl.future.SucceededFuture; @@ -617,4 +618,38 @@ static Future fromCompletionStage(CompletionStage completionStage, Con }); return promise.future(); } + + /** + * Park the current thread until the {@code future} is completed, when the future + * is completed the thread is un-parked and + * + *
    + *
  • the result value is returned when the future was completed with a result
  • + *
  • otherwise, the failure is thrown
  • + *
+ * + * This method must be called from a virtual thread. + * + * @param future the future to await + * @return the result + * @throws IllegalStateException when called from an event-loop thread or a non Vert.x thread + */ + static T await(Future future) { + io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor(); + io.vertx.core.impl.WorkerExecutor.TaskController cont = executor.current(); + future.onComplete(ar -> cont.resume()); + try { + cont.suspendAndAwaitResume(); + } catch (InterruptedException e) { + Utils.throwAsUnchecked(e.getCause()); + return null; + } + if (future.succeeded()) { + return future.result(); + } else { + Utils.throwAsUnchecked(future.cause()); + return null; + } + } + } diff --git a/src/main/java/io/vertx/core/ThreadingModel.java b/src/main/java/io/vertx/core/ThreadingModel.java new file mode 100644 index 00000000000..a5cc9122c57 --- /dev/null +++ b/src/main/java/io/vertx/core/ThreadingModel.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core; + +/** + * The threading model defines how user tasks should be executed. + * + * @author Julien Viet + */ +public enum ThreadingModel { + + /** + * Event-loop threading model. + */ + EVENT_LOOP, + + /** + * Worker threading model + */ + WORKER, + + /** + * Virtual thread threading model + */ + VIRTUAL_THREAD + +} + diff --git a/src/main/java/io/vertx/core/WorkerOptions.java b/src/main/java/io/vertx/core/WorkerOptions.java deleted file mode 100644 index e8327ad6418..00000000000 --- a/src/main/java/io/vertx/core/WorkerOptions.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ -package io.vertx.core; - -import io.vertx.codegen.annotations.DataObject; -import io.vertx.codegen.annotations.Unstable; - -import java.util.concurrent.ExecutorService; - -/** - * Generic worker options defining the characteristics of a worker service. - */ -@Unstable -@DataObject -public interface WorkerOptions { - - /** - * Create the executor service that implements this worker options. - * - * @param vertx the vertx instance - * @return the executor service implementing these worker options - */ - ExecutorService createExecutor(Vertx vertx); - - /** - * @return a copy of this object - */ - WorkerOptions copy(); - -} diff --git a/src/main/java/io/vertx/core/WorkerPoolOptions.java b/src/main/java/io/vertx/core/WorkerPoolOptions.java deleted file mode 100644 index 49923840dfc..00000000000 --- a/src/main/java/io/vertx/core/WorkerPoolOptions.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ -package io.vertx.core; - -import io.vertx.codegen.annotations.DataObject; -import io.vertx.core.json.JsonObject; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Worker pool options. - * - * @author Julien Viet - */ -@DataObject(generateConverter = true, publicConverter = false) -public class WorkerPoolOptions implements WorkerOptions { - - private String name; - private int size; - private long maxExecuteTime; - private TimeUnit maxExecuteTimeUnit; - - /** - * Default constructor - */ - public WorkerPoolOptions() { - this.size = VertxOptions.DEFAULT_WORKER_POOL_SIZE; - this.maxExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME; - this.maxExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; - } - - public WorkerPoolOptions(JsonObject json) { - WorkerPoolOptionsConverter.fromJson(json, this); - } - - /** - * Copy constructor - */ - public WorkerPoolOptions(WorkerPoolOptions other) { - this.name = other.name; - this.size = other.size; - this.maxExecuteTime = other.maxExecuteTime; - this.maxExecuteTimeUnit = other.maxExecuteTimeUnit; - } - - @Override - public ExecutorService createExecutor(Vertx vertx) { - throw new UnsupportedOperationException(); - } - - @Override - public WorkerPoolOptions copy() { - return new WorkerPoolOptions(this); - } - - /** - * @return the worker pool name - */ - public String getName() { - return name; - } - - /** - * Set the worker pool name to use. - * - * @param name the worker pool name - * @return a reference to this, so the API can be used fluently - */ - public WorkerPoolOptions setName(String name) { - this.name = name; - return this; - } - - /** - * Get the maximum number of worker threads to be used by the worker pool. - * - * @return the maximum number of worker threads - */ - public int getSize() { - return size; - } - - /** - * Set the maximum number of worker threads to be used by the pool. - * - * @param size the number of threads - * @return a reference to this, so the API can be used fluently - */ - public WorkerPoolOptions setSize(int size) { - if (size < 1) { - throw new IllegalArgumentException("size must be > 0"); - } - this.size = size; - return this; - } - - /** - * Get the value of max worker execute time, in {@link #setMaxExecuteTimeUnit maxWorkerExecuteTimeUnit}. - *

- * Vert.x will automatically log a warning if it detects that worker threads haven't returned within this time. - *

- * This can be used to detect where the user is blocking a worker thread for too long. Although worker threads - * can be blocked longer than event loop threads, they shouldn't be blocked for long periods of time. - * - * @return The value of max execute time, the default value of {@link #setMaxExecuteTimeUnit} is {@link TimeUnit#NANOSECONDS} - */ - public long getMaxExecuteTime() { - return maxExecuteTime; - } - - /** - * Sets the value of max worker execute time, in {@link DeploymentOptions#setMaxWorkerExecuteTimeUnit maxWorkerExecuteTimeUnit}. - *

- * The default value of {@link DeploymentOptions#setMaxWorkerExecuteTimeUnit maxWorkerExecuteTimeUnit} is {@link TimeUnit#NANOSECONDS} - * - * @param maxExecuteTime the value of max worker execute time, in {@link DeploymentOptions#setMaxWorkerExecuteTimeUnit maxWorkerExecuteTimeUnit}. - * @return a reference to this, so the API can be used fluently - */ - public WorkerPoolOptions setMaxExecuteTime(long maxExecuteTime) { - if (maxExecuteTime < 1) { - throw new IllegalArgumentException("maxExecuteTime must be > 0"); - } - this.maxExecuteTime = maxExecuteTime; - return this; - } - - /** - * @return the time unit of {@code maxExecuteTime} - */ - public TimeUnit getMaxExecuteTimeUnit() { - return maxExecuteTimeUnit; - } - - /** - * Set the time unit of {@link #maxExecuteTime} - * - * @param maxExecuteTimeUnit the time unit of {@link #maxExecuteTime} - * @return a reference to this, so the API can be used fluently - */ - public WorkerPoolOptions setMaxExecuteTimeUnit(TimeUnit maxExecuteTimeUnit) { - this.maxExecuteTimeUnit = maxExecuteTimeUnit; - return this; - } - - /** - * Convert this to JSON - * - * @return the JSON - */ - public JsonObject toJson() { - JsonObject json = new JsonObject(); - WorkerPoolOptionsConverter.toJson(this, json); - return json; - } -} diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 9e374331f0e..74d283611b7 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -43,7 +43,7 @@ static void setResultHandler(ContextInternal ctx, Future fut, Handler void setResultHandler(ContextInternal ctx, Future fut, Handler doDeploy(String identifier, AtomicInteger deployCount = new AtomicInteger(); AtomicBoolean failureReported = new AtomicBoolean(); WorkerPool workerPool = null; - WorkerOptions workerOptions = options.getWorkerOptions(); - if (workerOptions instanceof WorkerPoolOptions) { - WorkerPoolOptions workerPoolOptions = (WorkerPoolOptions) workerOptions; - if (workerPoolOptions.getName() != null) { + ThreadingModel mode = options.getThreadingModel(); + if (mode == null) { + mode = ThreadingModel.EVENT_LOOP; + } + if (mode != ThreadingModel.VIRTUAL_THREAD) { + if (options.getWorkerPoolName() != null) { workerPool = vertx.createSharedWorkerPool(options.getWorkerPoolName(), options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()); } } else { - ExecutorService pool = workerOptions.createExecutor(vertx); - workerPool = vertx.wrapWorkerPool(pool); + if (!VertxInternal.isVirtualThreadAvailable()) { + return callingContext.failedFuture("This Java runtime does not support virtual threads"); + } } DeploymentImpl deployment = new DeploymentImpl(parent, workerPool, deploymentID, identifier, options); - for (Verticle verticle: verticles) { CloseFuture closeFuture = new CloseFuture(log); - ContextImpl context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) : - vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl)); + ContextImpl context; + switch (mode) { + default: + context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); + break; + case WORKER: + context = vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl); + break; + case VIRTUAL_THREAD: + context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl); + break; + } VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture); deployment.addVerticle(holder); context.runOnContext(v -> { diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java index fd3dbe3fbb9..142fd77ebf7 100644 --- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -11,10 +11,7 @@ package io.vertx.core.impl; import io.netty.channel.EventLoop; -import io.vertx.core.Context; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; +import io.vertx.core.*; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.tracing.VertxTracer; @@ -43,6 +40,11 @@ class DuplicatedContext implements ContextInternal { this.delegate = delegate; } + @Override + public ThreadingModel threadingModel() { + return delegate.threadingModel(); + } + @Override public boolean inThread() { return delegate.inThread(); diff --git a/src/main/java/io/vertx/core/impl/ThreadPerTaskExecutorService.java b/src/main/java/io/vertx/core/impl/ThreadPerTaskExecutorService.java new file mode 100644 index 00000000000..178ffe08784 --- /dev/null +++ b/src/main/java/io/vertx/core/impl/ThreadPerTaskExecutorService.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThreadPerTaskExecutorService extends AbstractExecutorService { + + private static final int ST_RUNNING = 0; + private static final int ST_SHUTTING_DOWN = 1; + private static final int ST_TERMINATED = 2; + + private final AtomicInteger state = new AtomicInteger(); + private final Set threads = ConcurrentHashMap.newKeySet(); + private final CountDownLatch terminated = new CountDownLatch(1); + private final ThreadFactory threadFactory; + + public ThreadPerTaskExecutorService(ThreadFactory threadFactory) { + this.threadFactory = Objects.requireNonNull(threadFactory); + } + + @Override + public void shutdown() { + shutdown(false); + } + + @Override + public List shutdownNow() { + shutdown(true); + return Collections.emptyList(); + } + + private void shutdown(boolean now) { + if (state.get() == ST_RUNNING && state.compareAndSet(ST_RUNNING, ST_SHUTTING_DOWN)) { + if (threads.isEmpty()) { + state.set(ST_TERMINATED); + terminated.countDown(); + } else { + if (now) { + for (Thread thread : threads) { + thread.interrupt(); + } + } + } + } + } + + @Override + public boolean isShutdown() { + return state.get() != ST_RUNNING; + } + + @Override + public boolean isTerminated() { + return state.get() == ST_TERMINATED; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return terminated.await(timeout, unit); + } + + @Override + public void execute(Runnable command) { + Objects.requireNonNull(command); + if (state.get() == ST_RUNNING) { + Thread thread = threadFactory.newThread(() -> { + try { + command.run(); + } finally { + threads.remove(Thread.currentThread()); + if (state.get() == ST_SHUTTING_DOWN && threads.isEmpty()) { + if (state.compareAndSet(ST_SHUTTING_DOWN, ST_TERMINATED)) { + terminated.countDown(); + } + } + } + }); + threads.add(thread); + thread.start(); + } else { + throw new RejectedExecutionException(); + } + } +} diff --git a/src/main/java/io/vertx/core/impl/Utils.java b/src/main/java/io/vertx/core/impl/Utils.java index 9aedf0d2716..9b604093d99 100644 --- a/src/main/java/io/vertx/core/impl/Utils.java +++ b/src/main/java/io/vertx/core/impl/Utils.java @@ -45,4 +45,8 @@ public static boolean isWindows() { return isWindows; } + @SuppressWarnings("unchecked") + public static void throwAsUnchecked(Throwable t) throws E { + throw (E) t; + } } diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 047ea641e39..859d5046c14 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -40,7 +40,6 @@ import io.vertx.core.file.impl.WindowsFileSystem; import io.vertx.core.http.impl.HttpClientImpl; import io.vertx.core.http.impl.HttpServerImpl; -import io.vertx.core.http.impl.SharedHttpClient; import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -69,6 +68,7 @@ import java.io.File; import java.io.IOException; import java.lang.ref.WeakReference; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -99,6 +99,9 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); + public static final ThreadFactory VIRTUAL_THREAD_FACTORY; + private static final Throwable VIRTUAL_THREAD_FACTORY_UNAVAILABILITY_CAUSE; + static { // Disable Netty's resource leak detection to reduce the performance overhead if not set by user // Supports both the default netty leak detection system property and the deprecated one @@ -106,6 +109,23 @@ public class VertxImpl implements VertxInternal, MetricsProvider { System.getProperty("io.netty.leakDetectionLevel") == null) { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); } + + ThreadFactory factory = null; + Throwable unavailabilityCause = null; + try { + Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); + Class ofVirtualClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder$OfVirtual"); + Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); + Object builder = ofVirtualMethod.invoke(null); + Method nameMethod = ofVirtualClass.getDeclaredMethod("name", String.class, long.class); + Method factoryMethod = builderClass.getDeclaredMethod("factory"); + builder = nameMethod.invoke(builder, "vert.x-virtual-thread-", 0L); + factory = (ThreadFactory) factoryMethod.invoke(builder); + } catch (Exception e) { + unavailabilityCause = e; + } + VIRTUAL_THREAD_FACTORY = factory; + VIRTUAL_THREAD_FACTORY_UNAVAILABILITY_CAUSE = unavailabilityCause; } private final FileSystem fileSystem = getFileSystem(); @@ -122,11 +142,13 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private final Map sharedNetServers = new HashMap<>(); final WorkerPool workerPool; final WorkerPool internalWorkerPool; + final WorkerPool virtualThreaWorkerPool; private final VertxThreadFactory threadFactory; private final ExecutorServiceFactory executorServiceFactory; private final ThreadFactory eventLoopThreadFactory; private final EventLoopGroup eventLoopGroup; private final EventLoopGroup acceptorEventLoopGroup; + private final ExecutorService virtualThreadExecutor; private final BlockedThreadChecker checker; private final AddressResolver addressResolver; private final AddressResolverOptions addressResolverOptions; @@ -180,6 +202,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider { // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections // under a lot of load acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100); + virtualThreadExecutor = VIRTUAL_THREAD_FACTORY != null ? new ThreadPerTaskExecutorService(VIRTUAL_THREAD_FACTORY) : null; + virtualThreaWorkerPool = new WorkerPool(virtualThreadExecutor, null); internalWorkerPool = new WorkerPool(internalWorkerExec, internalBlockingPoolMetrics); namedWorkerPools = new HashMap<>(); workerPool = new WorkerPool(workerExec, workerPoolMetrics); @@ -517,7 +541,7 @@ public boolean cancelTimer(long id) { } private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { - return new ContextImpl(this, true, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl); + return new ContextImpl(this, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl); } @Override @@ -538,7 +562,7 @@ public ContextImpl createEventLoopContext() { private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { TaskQueue orderedTasks = new TaskQueue(); WorkerPool wp = workerPool != null ? workerPool : this.workerPool; - return new ContextImpl(this, false, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl); + return new ContextImpl(this, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl); } @Override @@ -556,6 +580,29 @@ public ContextImpl createWorkerContext() { return createWorkerContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader()); } + private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl) { + if (!VertxInternal.isVirtualThreadAvailable()) { + throw new IllegalStateException("This Java runtime does not support virtual threads"); + } + TaskQueue orderedTasks = new TaskQueue(); + return new ContextImpl(this, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl); + } + + @Override + public ContextImpl createVirtualThreadContext(Deployment deployment, CloseFuture closeFuture, ClassLoader tccl) { + return createVirtualThreadContext(eventLoopGroup.next(), closeFuture, deployment, tccl); + } + + @Override + public ContextImpl createVirtualThreadContext(EventLoop eventLoop, ClassLoader tccl) { + return createVirtualThreadContext(eventLoop, closeFuture, null, tccl); + } + + @Override + public ContextImpl createVirtualThreadContext() { + return createVirtualThreadContext(null, closeFuture, Thread.currentThread().getContextClassLoader()); + } + @Override public DnsClient createDnsClient(int port, String host) { return createDnsClient(new DnsClientOptions().setHost(host).setPort(port)); @@ -911,6 +958,14 @@ private void deleteCacheDirAndShutdown(Handler> completionHand internalWorkerPool.close(); new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close); + if (virtualThreadExecutor != null) { + virtualThreadExecutor.shutdown(); + try { + virtualThreadExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + } + } + acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { diff --git a/src/main/java/io/vertx/core/impl/VertxInternal.java b/src/main/java/io/vertx/core/impl/VertxInternal.java index 71c18b1b605..17394d6e140 100644 --- a/src/main/java/io/vertx/core/impl/VertxInternal.java +++ b/src/main/java/io/vertx/core/impl/VertxInternal.java @@ -15,6 +15,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolverGroup; +import io.vertx.codegen.annotations.CacheReturn; import io.vertx.core.*; import io.vertx.core.http.*; import io.vertx.core.http.impl.HttpServerImpl; @@ -143,6 +144,21 @@ default C createSharedClient(String clientKey, String clientName, CloseFutur */ ContextInternal createWorkerContext(); + /** + * @return virtual thread context + */ + ContextInternal createVirtualThreadContext(Deployment deployment, CloseFuture closeFuture, ClassLoader tccl); + + /** + * @return virtual thread context + */ + ContextInternal createVirtualThreadContext(EventLoop eventLoop, ClassLoader tccl); + + /** + * @return virtual thread context + */ + ContextInternal createVirtualThreadContext(); + @Override WorkerExecutorInternal createSharedWorkerExecutor(String name); @@ -236,4 +252,10 @@ default Future executeBlockingInternal(Callable blockingCodeHandler, b void removeCloseHook(Closeable hook); + /** + * @return whether virtual threads are available + */ + static boolean isVirtualThreadAvailable() { + return VertxImpl.VIRTUAL_THREAD_FACTORY != null; + } } diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutor.java b/src/main/java/io/vertx/core/impl/WorkerExecutor.java index 33e2bc28a69..067b5c4da88 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutor.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutor.java @@ -10,9 +10,11 @@ */ package io.vertx.core.impl; +import io.vertx.core.Vertx; import io.vertx.core.spi.metrics.PoolMetrics; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -22,6 +24,21 @@ */ public class WorkerExecutor implements EventExecutor { + public static io.vertx.core.impl.WorkerExecutor unwrapWorkerExecutor() { + ContextInternal ctx = (ContextInternal) Vertx.currentContext(); + if (ctx != null) { + ctx = ctx.unwrap(); + Executor executor = ctx.executor(); + if (executor instanceof io.vertx.core.impl.WorkerExecutor) { + return (io.vertx.core.impl.WorkerExecutor) executor; + } else { + throw new IllegalStateException("Cannot be called on a Vert.x event-loop thread"); + } + } + // Technically it works also for worker threads but we don't want to encourage this + throw new IllegalStateException("Not running from a Vert.x virtual thread"); + } + private final WorkerPool workerPool; private final TaskQueue orderedTasks; private final ThreadLocal inThread = new ThreadLocal<>(); diff --git a/src/main/java11/io/vertx/core/DeploymentOptions.java b/src/main/java11/io/vertx/core/DeploymentOptions.java index f0345445e81..de082822cd4 100644 --- a/src/main/java11/io/vertx/core/DeploymentOptions.java +++ b/src/main/java11/io/vertx/core/DeploymentOptions.java @@ -30,26 +30,32 @@ @DataObject(generateConverter = true, publicConverter = false) public class DeploymentOptions { + public static final ThreadingModel DEFAULT_MODE = ThreadingModel.EVENT_LOOP; public static final boolean DEFAULT_WORKER = false; public static final boolean DEFAULT_HA = false; public static final int DEFAULT_INSTANCES = 1; private JsonObject config; - private boolean worker; - private WorkerOptions workerOptions; + private ThreadingModel threadingModel; private boolean ha; private int instances; private ClassLoader classLoader; + private String workerPoolName; + private int workerPoolSize; + private long maxWorkerExecuteTime; + private TimeUnit maxWorkerExecuteTimeUnit; /** * Default constructor */ public DeploymentOptions() { - this.worker = DEFAULT_WORKER; + this.threadingModel = DEFAULT_MODE; this.config = null; this.ha = DEFAULT_HA; this.instances = DEFAULT_INSTANCES; - this.workerOptions = new WorkerPoolOptions(); + this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE; + this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME; + this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; } /** @@ -59,10 +65,13 @@ public DeploymentOptions() { */ public DeploymentOptions(DeploymentOptions other) { this.config = other.getConfig() == null ? null : other.getConfig().copy(); - this.worker = other.isWorker(); + this.threadingModel = other.getThreadingModel(); this.ha = other.isHa(); this.instances = other.instances; - this.workerOptions = other.workerOptions.copy(); + this.workerPoolName = other.workerPoolName; + this.workerPoolSize = other.workerPoolSize; + this.maxWorkerExecuteTime = other.maxWorkerExecuteTime; + this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit; } /** @@ -82,7 +91,6 @@ public DeploymentOptions(JsonObject json) { */ public void fromJson(JsonObject json) { this.config = json.getJsonObject("config"); - this.worker = json.getBoolean("worker", DEFAULT_WORKER); this.ha = json.getBoolean("ha", DEFAULT_HA); this.instances = json.getInteger("instances", DEFAULT_INSTANCES); } @@ -107,23 +115,43 @@ public DeploymentOptions setConfig(JsonObject config) { return this; } + /** + * Which threading model the verticle(s) should use? + * + * @return the verticle threading model + */ + public ThreadingModel getThreadingModel() { + return threadingModel; + } + + /** + * Set the verticle(s) verticle(s) threading model, e.g. a worker or a virtual thread verticle + * + * @param threadingModel the threading model + * @return a reference to this, so the API can be used fluently + */ + public DeploymentOptions setThreadingModel(ThreadingModel threadingModel) { + this.threadingModel = threadingModel; + return this; + } + /** * Should the verticle(s) be deployed as a worker verticle? * - * @return true if will be deployed as worker, false otherwise + * @return {@code true} if will be deployed as worker, {@code false} otherwise */ public boolean isWorker() { - return worker; + return threadingModel == ThreadingModel.WORKER; } /** * Set whether the verticle(s) should be deployed as a worker verticle * - * @param worker true for worker, false otherwise + * @param worker {@code true} for worker, {@code false} force event-loop * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorker(boolean worker) { - this.worker = worker; + this.threadingModel = worker ? ThreadingModel.WORKER : ThreadingModel.EVENT_LOOP; return this; } @@ -167,33 +195,11 @@ public DeploymentOptions setInstances(int instances) { return this; } - /** - * @return the worker options - */ - public WorkerOptions getWorkerOptions() { - return workerOptions; - } - - /** - * Set the verticle worker options. - * - * @param workerOptions the worker options to use - * @return a reference to this, so the API can be used fluently - */ - public DeploymentOptions setWorkerOptions(WorkerOptions workerOptions) { - this.workerOptions = workerOptions; - return this; - } - - private WorkerPoolOptions assumeWorkerPool() { - return (WorkerPoolOptions) workerOptions; - } - /** * @return the worker pool name */ public String getWorkerPoolName() { - return assumeWorkerPool().getName(); + return workerPoolName; } /** @@ -204,7 +210,7 @@ public String getWorkerPoolName() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorkerPoolName(String workerPoolName) { - assumeWorkerPool().setName(workerPoolName); + this.workerPoolName = workerPoolName; return this; } @@ -219,7 +225,7 @@ public DeploymentOptions setWorkerPoolName(String workerPoolName) { * @return the maximum number of worker threads */ public int getWorkerPoolSize() { - return assumeWorkerPool().getSize(); + return workerPoolSize; } /** @@ -231,7 +237,10 @@ public int getWorkerPoolSize() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setWorkerPoolSize(int workerPoolSize) { - assumeWorkerPool().setSize(workerPoolSize); + if (workerPoolSize < 1) { + throw new IllegalArgumentException("size must be > 0"); + } + this.workerPoolSize = workerPoolSize; return this; } @@ -248,7 +257,7 @@ public DeploymentOptions setWorkerPoolSize(int workerPoolSize) { * @return The value of max worker execute time, the default value of {@link DeploymentOptions#setMaxWorkerExecuteTimeUnit} {@code maxWorkerExecuteTimeUnit} is {@link TimeUnit#NANOSECONDS} */ public long getMaxWorkerExecuteTime() { - return assumeWorkerPool().getMaxExecuteTime(); + return maxWorkerExecuteTime; } /** @@ -262,7 +271,10 @@ public long getMaxWorkerExecuteTime() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setMaxWorkerExecuteTime(long maxWorkerExecuteTime) { - assumeWorkerPool().setMaxExecuteTime(maxWorkerExecuteTime); + if (maxWorkerExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + this.maxWorkerExecuteTime = maxWorkerExecuteTime; return this; } @@ -272,7 +284,7 @@ public DeploymentOptions setMaxWorkerExecuteTime(long maxWorkerExecuteTime) { * @return the time unit of {@code maxWorkerExecuteTime} */ public TimeUnit getMaxWorkerExecuteTimeUnit() { - return assumeWorkerPool().getMaxExecuteTimeUnit(); + return maxWorkerExecuteTimeUnit; } /** @@ -284,7 +296,7 @@ public TimeUnit getMaxWorkerExecuteTimeUnit() { * @return a reference to this, so the API can be used fluently */ public DeploymentOptions setMaxWorkerExecuteTimeUnit(TimeUnit maxWorkerExecuteTimeUnit) { - assumeWorkerPool().setMaxExecuteTimeUnit(maxWorkerExecuteTimeUnit); + this.maxWorkerExecuteTimeUnit = maxWorkerExecuteTimeUnit; return this; } diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index 9a74b38870a..c0076626eb1 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -11,6 +11,7 @@ package io.vertx.core.impl; +import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; /** @@ -33,7 +34,7 @@ public static ContextInternal create(Vertx vertx) { VertxImpl impl = (VertxImpl) vertx; return new ContextImpl( impl, - false, + ThreadingModel.WORKER, impl.getEventLoopGroup().next(), EXECUTOR, impl.internalWorkerPool, diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 89cae182ac5..2665b89f1df 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -13,9 +13,9 @@ import io.netty.channel.EventLoop; import io.vertx.core.impl.*; -import io.vertx.core.impl.btc.BlockedThreadChecker; import io.vertx.core.impl.future.PromiseInternal; import io.vertx.test.core.VertxTestBase; +import org.junit.Assume; import org.junit.Test; import java.net.URL; @@ -1013,4 +1013,38 @@ public void testDispatchContextOnAnyThread() { assertSame(current, thread.getContextClassLoader()); assertEquals(2, exec.get()); } + + @Test + public void testAwaitFromEventLoopThread() { + testAwaitFromContextThread(ThreadingModel.EVENT_LOOP, true); + } + + @Test + public void testAwaitFromWorkerThread() { + testAwaitFromContextThread(ThreadingModel.WORKER, false); + } + + @Test + public void testAwaitFromVirtualThreadThread() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + testAwaitFromContextThread(ThreadingModel.VIRTUAL_THREAD, false); + } + + private void testAwaitFromContextThread(ThreadingModel threadMode, boolean fail) { + vertx.deployVerticle(() -> new AbstractVerticle() { + @Override + public void start() { + Promise promise = Promise.promise(); + vertx.setTimer(10, id -> promise.complete("foo")); + try { + String res = Future.await(promise.future()); + assertFalse(fail); + assertEquals("foo", res); + } catch (IllegalStateException e) { + assertTrue(fail); + } + } + }, new DeploymentOptions().setThreadingModel(threadMode)).onComplete(onSuccess(v -> testComplete())); + await(); + } } diff --git a/src/test/java/io/vertx/core/FakeContext.java b/src/test/java/io/vertx/core/FakeContext.java index 9a83192c661..620f4818832 100644 --- a/src/test/java/io/vertx/core/FakeContext.java +++ b/src/test/java/io/vertx/core/FakeContext.java @@ -33,6 +33,11 @@ public Executor executor() { }; } + @Override + public ThreadingModel threadingModel() { + return null; + } + @Override public boolean inThread() { return false; diff --git a/src/test/java/io/vertx/core/FutureTest.java b/src/test/java/io/vertx/core/FutureTest.java index bfa8cb8db60..51dbbd5769f 100644 --- a/src/test/java/io/vertx/core/FutureTest.java +++ b/src/test/java/io/vertx/core/FutureTest.java @@ -1736,4 +1736,13 @@ public void testAndThenCompleteHandlerWithError() { await(); } + + @Test + public void testAwaitFromPlainThread() { + try { + Future.await(Promise.promise().future()); + fail(); + } catch (IllegalStateException e) { + } + } } diff --git a/src/test/java/io/vertx/core/VirtualThreadContextTest.java b/src/test/java/io/vertx/core/VirtualThreadContextTest.java new file mode 100644 index 00000000000..d12f183c043 --- /dev/null +++ b/src/test/java/io/vertx/core/VirtualThreadContextTest.java @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core; + +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.WorkerExecutor; +import io.vertx.core.impl.future.PromiseInternal; +import io.vertx.test.core.VertxTestBase; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +public class VirtualThreadContextTest extends VertxTestBase { + + VertxInternal vertx; + + @Before + public void setUp() throws Exception { + super.setUp(); + vertx = (VertxInternal) super.vertx; + } + + @Test + public void testContext() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + vertx.createVirtualThreadContext().runOnContext(v -> { + Thread thread = Thread.currentThread(); + assertTrue(VirtualThreadDeploymentTest.isVirtual(thread)); + ContextInternal context = vertx.getOrCreateContext(); + Executor executor = context.executor(); + assertTrue(executor instanceof WorkerExecutor); + context.runOnContext(v2 -> { + // assertSame(thread, Thread.currentThread()); + assertSame(context, vertx.getOrCreateContext()); + testComplete(); + }); + }); + await(); + } + + @Test + public void testAwaitFutureSuccess() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + Object result = new Object(); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + PromiseInternal promise = context.promise(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + promise.complete(result); + }).start(); + assertSame(result, Future.await(promise.future())); + testComplete(); + }); + await(); + } + + @Test + public void testAwaitFutureFailure() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + Exception failure = new Exception(); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + PromiseInternal promise = context.promise(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + promise.fail(failure); + }).start(); + try { + Future.await(promise.future()); + } catch (Exception e) { + assertSame(failure, e); + testComplete(); + return; + } + fail(); + }); + await(); + } + + @Test + public void testAwaitCompoundFuture() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + Object result = new Object(); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + PromiseInternal promise = context.promise(); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + promise.complete(result); + }).start(); + assertSame("HELLO", Future.await(promise.future().map(res -> "HELLO"))); + testComplete(); + }); + await(); + } + + @Test + public void testDuplicateUseSameThread() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + int num = 1000; + waitFor(num); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + Thread th = Thread.currentThread(); + for (int i = 0;i < num;i++) { + ContextInternal duplicate = context.duplicate(); + duplicate.runOnContext(v2 -> { + // assertSame(th, Thread.currentThread()); + complete(); + }); + } + }); + await(); + } + + @Test + public void testDuplicateConcurrentAwait() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + int num = 1000; + waitFor(num); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + Object lock = new Object(); + List> list = new ArrayList<>(); + for (int i = 0;i < num;i++) { + ContextInternal duplicate = context.duplicate(); + duplicate.runOnContext(v2 -> { + Promise promise = duplicate.promise(); + boolean complete; + synchronized (lock) { + list.add(promise); + complete = list.size() == num; + } + if (complete) { + context.runOnContext(v3 -> { + synchronized (lock) { + list.forEach(p -> p.complete(null)); + } + }); + } + Future f = promise.future(); + Future.await(f); + complete(); + }); + } + }); + await(); + } + + @Test + public void testTimer() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + vertx.createVirtualThreadContext().runOnContext(v -> { + ContextInternal context = vertx.getOrCreateContext(); + PromiseInternal promise = context.promise(); + vertx.setTimer(100, id -> { + promise.complete("foo"); + }); + String res = Future.await(promise); + assertEquals("foo", res); + testComplete(); + }); + await(); + } + + @Test + public void testInThread() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + vertx.createVirtualThreadContext().runOnContext(v1 -> { + ContextInternal context = vertx.getOrCreateContext(); + assertTrue(context.inThread()); + new Thread(() -> { + boolean wasNotInThread = !context.inThread(); + context.runOnContext(v2 -> { + assertTrue(wasNotInThread); + assertTrue(context.inThread()); + testComplete(); + }); + }).start(); + }); + await(); + } + + private void sleep(AtomicInteger inflight) { + assertEquals(0, inflight.getAndIncrement()); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + inflight.decrementAndGet(); + } + } + + @Test + public void testSerializeBlocking() throws Exception { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + AtomicInteger inflight = new AtomicInteger(); + vertx.createVirtualThreadContext().runOnContext(v1 -> { + Context ctx = vertx.getOrCreateContext(); + for (int i = 0;i < 10;i++) { + ctx.runOnContext(v2 -> sleep(inflight)); + } + ctx.runOnContext(v -> testComplete()); + }); + await(); + } + + @Test + public void testVirtualThreadsNotAvailable() { + Assume.assumeFalse(VertxInternal.isVirtualThreadAvailable()); + try { + vertx.createVirtualThreadContext(); + fail(); + } catch (IllegalStateException expected) { + } + } +} diff --git a/src/test/java/io/vertx/core/VirtualThreadDeploymentTest.java b/src/test/java/io/vertx/core/VirtualThreadDeploymentTest.java new file mode 100644 index 00000000000..00aa50043d1 --- /dev/null +++ b/src/test/java/io/vertx/core/VirtualThreadDeploymentTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.impl.VertxInternal; +import io.vertx.test.core.VertxTestBase; +import junit.framework.AssertionFailedError; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class VirtualThreadDeploymentTest extends VertxTestBase { + + static { + Method isVirtualMethod = null; + try { + isVirtualMethod = Thread.class.getDeclaredMethod("isVirtual"); + } catch (NoSuchMethodException ignore) { + } + IS_VIRTUAL = isVirtualMethod; + } + + private static final Method IS_VIRTUAL; + + public static boolean isVirtual(Thread th) { + if (IS_VIRTUAL != null) { + try { + return (boolean) IS_VIRTUAL.invoke(th); + } catch (Exception e) { + AssertionFailedError afe = new AssertionFailedError(); + afe.initCause(e); + throw afe; + } + } else { + return false; + } + } + + @Test + public void testDeploy() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + assertTrue(isVirtual(Thread.currentThread())); + Future fut = Future.future(p -> vertx.setTimer(500, id -> p.complete())); + Future.await(fut); + testComplete(); + } + }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)); + await(); + } + + @Test + public void testExecuteBlocking() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + Future fut = vertx.executeBlocking(() -> { + assertTrue(isVirtual(Thread.currentThread())); + return Thread.currentThread().getName(); + }); + String res = Future.await(fut); + assertNotSame(Thread.currentThread().getName(), res); + testComplete(); + } + }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)); + await(); + } + + @Test + public void testDeployHTTPServer() throws Exception { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + AtomicInteger inflight = new AtomicInteger(); + AtomicBoolean processing = new AtomicBoolean(); + AtomicInteger max = new AtomicInteger(); + vertx.deployVerticle(new AbstractVerticle() { + HttpServer server; + @Override + public void start() { + server = vertx.createHttpServer().requestHandler(req -> { + assertFalse(processing.getAndSet(true)); + int val = inflight.incrementAndGet(); + max.set(Math.max(val, max.get())); + Future fut = Future.future(p -> vertx.setTimer(50, id -> p.complete())); + processing.set(false); + Future.await(fut); + assertFalse(processing.getAndSet(true)); + req.response().end(); + inflight.decrementAndGet(); + processing.set(false); + }); + Future.await(server.listen(8080, "localhost")); + } + }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)) + .toCompletionStage() + .toCompletableFuture() + .get(); + HttpClient client = vertx.createHttpClient(); + int numReq = 10; + waitFor(numReq); + for (int i = 0;i < numReq;i++) { + Future resp = client.request(HttpMethod.GET, 8080, "localhost", "/") + .compose(req -> req.send() + .compose(HttpClientResponse::body)); + resp.onComplete(onSuccess(v -> complete())); + } + await(); + Assert.assertEquals(5, max.get()); + } + + @Test + public void testVirtualThreadsNotAvailable() { + Assume.assumeFalse(VertxInternal.isVirtualThreadAvailable()); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + } + }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)).onComplete(onFailure(err -> { + testComplete(); + })); + await(); + } +} diff --git a/src/test/java/io/vertx/core/eventbus/VirtualThreadEventBusTest.java b/src/test/java/io/vertx/core/eventbus/VirtualThreadEventBusTest.java new file mode 100644 index 00000000000..92ffcd5d92f --- /dev/null +++ b/src/test/java/io/vertx/core/eventbus/VirtualThreadEventBusTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus; + +import io.vertx.core.Future; +import io.vertx.core.impl.VertxInternal; +import io.vertx.test.core.VertxTestBase; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +public class VirtualThreadEventBusTest extends VertxTestBase { + + VertxInternal vertx; + + @Before + public void setUp() throws Exception { + super.setUp(); + vertx = (VertxInternal) super.vertx; + } + + @Test + public void testEventBus() { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + EventBus eb = vertx.eventBus(); + eb.consumer("test-addr", msg -> { + msg.reply(msg.body()); + }); + vertx.createVirtualThreadContext().runOnContext(v -> { + Message ret = Future.await(eb.request("test-addr", "test")); + assertEquals("test", ret.body()); + testComplete(); + }); + await(); + } +} diff --git a/src/test/java/io/vertx/core/http/VirtualThreadHttpTest.java b/src/test/java/io/vertx/core/http/VirtualThreadHttpTest.java new file mode 100644 index 00000000000..fcf50cb24b3 --- /dev/null +++ b/src/test/java/io/vertx/core/http/VirtualThreadHttpTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.future.PromiseInternal; +import io.vertx.test.core.VertxTestBase; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +public class VirtualThreadHttpTest extends VertxTestBase { + + private VertxInternal vertx; + + @Before + public void setUp() throws Exception { + super.setUp(); + vertx = (VertxInternal) super.vertx; + } + + @Test + public void testHttpClient1() throws Exception { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + HttpServer server = vertx.createHttpServer(); + server.requestHandler(req -> { + req.response().end("Hello World"); + }); + server.listen(8088, "localhost").toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + vertx.createVirtualThreadContext().runOnContext(v -> { + HttpClient client = vertx.createHttpClient(); + for (int i = 0; i < 100; ++i) { + HttpClientRequest req = Future.await(client.request(HttpMethod.GET, 8088, "localhost", "/")); + HttpClientResponse resp = Future.await(req.send()); + Buffer body = Future.await(resp.body()); + String bodyString = body.toString(StandardCharsets.UTF_8); + assertEquals("Hello World", body.toString()); + } + testComplete(); + }); + await(); + } + + @Test + public void testHttpClient2() throws Exception { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + waitFor(100); + HttpServer server = vertx.createHttpServer(); + server.requestHandler(req -> { + req.response().end("Hello World"); + }); + server.listen(8088, "localhost").toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + HttpClient client = vertx.createHttpClient(); + vertx.createVirtualThreadContext().runOnContext(v -> { + for (int i = 0; i < 100; ++i) { + client.request(HttpMethod.GET, 8088, "localhost", "/").onSuccess(req -> { + HttpClientResponse resp = Future.await(req.send()); + StringBuffer body = new StringBuffer(); + resp.handler(buff -> { + body.append(buff.toString()); + }); + resp.endHandler(v2 -> { + assertEquals("Hello World", body.toString()); + complete(); + }); + }); + } + }); + try { + await(); + } finally { + server.close().toCompletionStage().toCompletableFuture().get(); + client.close().toCompletionStage().toCompletableFuture().get(); + } + } + + @Test + public void testHttpClientTimeout() throws Exception { + Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable()); + HttpServer server = vertx.createHttpServer(); + server.requestHandler(req -> { + }); + server.listen(8088, "localhost").toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + vertx.createVirtualThreadContext().runOnContext(v -> { + HttpClient client = vertx.createHttpClient(); + ContextInternal ctx = vertx.getOrCreateContext(); + HttpClientRequest req = Future.await(client.request(HttpMethod.GET, 8088, "localhost", "/")); + PromiseInternal promise = ctx.promise(); + req.send().onComplete(promise); + Exception failure = new Exception("Too late"); + vertx.setTimer(500, id -> promise.tryFail(failure)); + try { + HttpClientResponse resp = Future.await(promise.future()); + } catch (Exception e) { + assertSame(failure, e); + testComplete(); + } + }); + await(); + } +} diff --git a/src/test/java/io/vertx/core/impl/ThreadPerTaskExecutorServiceTest.java b/src/test/java/io/vertx/core/impl/ThreadPerTaskExecutorServiceTest.java new file mode 100644 index 00000000000..54be222af17 --- /dev/null +++ b/src/test/java/io/vertx/core/impl/ThreadPerTaskExecutorServiceTest.java @@ -0,0 +1,84 @@ +package io.vertx.core.impl; + +import io.vertx.test.core.AsyncTestBase; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThreadPerTaskExecutorServiceTest extends AsyncTestBase { + + @Test + public void testExecute() throws Exception { + ThreadPerTaskExecutorService exec = new ThreadPerTaskExecutorService(Executors.defaultThreadFactory()); + int numTasks = 100; + Set threads = Collections.synchronizedSet(new HashSet<>()); + for (int i = 0;i < numTasks;i++) { + exec.execute(() -> threads.add(Thread.currentThread())); + } + exec.shutdown(); + exec.awaitTermination(5, TimeUnit.SECONDS); + assertEquals(numTasks, threads.size()); + } + + @Test + public void testShutdown() throws Exception { + ThreadPerTaskExecutorService exec = new ThreadPerTaskExecutorService(Executors.defaultThreadFactory()); + int numTasks = 10; + CountDownLatch latch = new CountDownLatch(1); + CyclicBarrier barrier = new CyclicBarrier(numTasks + 1); + for (int i = 0;i < numTasks;i++) { + exec.execute(() -> { + try { + barrier.await(); + latch.await(); + } catch (Exception e) { + fail(e); + } + }); + } + barrier.await(); + exec.shutdown(); + latch.countDown(); + long now = System.currentTimeMillis(); + exec.awaitTermination(5, TimeUnit.SECONDS); + assertTrue(System.currentTimeMillis() - now < 1000); + } + + @Test + public void testShutdownEmpty() throws Exception { + ThreadPerTaskExecutorService exec = new ThreadPerTaskExecutorService(Executors.defaultThreadFactory()); + exec.shutdown(); + long now = System.currentTimeMillis(); + exec.awaitTermination(5, TimeUnit.SECONDS); + assertTrue(System.currentTimeMillis() - now < 1000); + } + + @Test + public void testInterrupt() throws Exception { + ThreadPerTaskExecutorService exec = new ThreadPerTaskExecutorService(Executors.defaultThreadFactory()); + int numTasks = 100; + CyclicBarrier barrier = new CyclicBarrier(numTasks + 1); + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger interrupts = new AtomicInteger(); + for (int i = 0;i < numTasks;i++) { + exec.execute(() -> { + try { + barrier.await(); + latch.await(); + } catch (InterruptedException e) { + interrupts.incrementAndGet(); + } catch (BrokenBarrierException e) { + fail(e); + } + }); + } + barrier.await(); + exec.shutdownNow(); + exec.awaitTermination(5, TimeUnit.SECONDS); + assertEquals(numTasks, interrupts.get()); + } +}