Skip to content

Commit 5862b72

Browse files
authored
WebClient failure when sending multipart formdata from a virtual thread (#2700)
MultipartFormUpload is created on the Vert.x context that is bound to the request promise. We should create the queue using the context executor for both the producer and consumer sides. Otherwise, if there is a large upload and the queue needs to be paused/resumed, the pump method will be invoked on the EventLoop executor bound to this context. And then the IllegalArgumentException is thrown. Signed-off-by: Thomas Segismont <[email protected]>
1 parent 1cd9a5c commit 5862b72

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.buffer.UnpooledByteBufAllocator;
2020
import io.netty.handler.codec.http.*;
2121
import io.netty.handler.codec.http.multipart.*;
22-
import io.vertx.core.Context;
2322
import io.vertx.core.Handler;
2423
import io.vertx.core.MultiMap;
2524
import io.vertx.core.buffer.Buffer;
@@ -56,13 +55,13 @@ public class MultipartFormUpload implements ReadStream<Buffer> {
5655
private boolean ended;
5756
private final ContextInternal context;
5857

59-
public MultipartFormUpload(Context context,
58+
public MultipartFormUpload(ContextInternal context,
6059
MultipartForm parts,
6160
boolean multipart,
6261
HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
63-
this.context = (ContextInternal) context;
62+
this.context = context;
6463
this.writable = true;
65-
this.pending = new InboundMessageQueue<>(((ContextInternal) context).eventLoop(), ((ContextInternal) context).executor()) {
64+
this.pending = new InboundMessageQueue<>(context.executor(), context.executor()) {
6665
@Override
6766
protected void handleResume() {
6867
writable = true;

vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616
package io.vertx.ext.web.client.tests;
1717

1818
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
19-
import io.vertx.core.Context;
2019
import io.vertx.core.Vertx;
2120
import io.vertx.core.buffer.Buffer;
21+
import io.vertx.core.internal.ContextInternal;
22+
import io.vertx.core.internal.VertxInternal;
2223
import io.vertx.ext.unit.Async;
2324
import io.vertx.ext.unit.TestContext;
2425
import io.vertx.ext.unit.junit.VertxUnitRunner;
2526
import io.vertx.ext.web.client.impl.MultipartFormUpload;
2627
import io.vertx.ext.web.multipart.MultipartForm;
2728
import io.vertx.test.core.TestUtils;
28-
import org.junit.After;
29-
import org.junit.Before;
30-
import org.junit.ClassRule;
31-
import org.junit.Test;
29+
import org.junit.*;
3230
import org.junit.rules.TemporaryFolder;
3331
import org.junit.runner.RunWith;
3432

@@ -40,18 +38,19 @@
4038
import java.util.concurrent.atomic.AtomicInteger;
4139

4240
import static org.junit.Assert.assertEquals;
41+
import static org.junit.Assume.assumeTrue;
4342

4443
@RunWith(VertxUnitRunner.class)
4544
public class MultipartFormUploadTest {
4645

4746
@ClassRule
4847
public static TemporaryFolder testFolder = new TemporaryFolder();
4948

50-
private Vertx vertx;
49+
private VertxInternal vertx;
5150

5251
@Before
5352
public void setUp() throws Exception {
54-
vertx = Vertx.vertx();
53+
vertx = (VertxInternal) Vertx.vertx();
5554
}
5655

5756
@After
@@ -63,7 +62,7 @@ public void tearDown(TestContext ctx) {
6362
public void testSimpleAttribute(TestContext ctx) throws Exception {
6463
Async async = ctx.async();
6564
Buffer result = Buffer.buffer();
66-
Context context = vertx.getOrCreateContext();
65+
ContextInternal context = vertx.getOrCreateContext();
6766
MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().attribute("foo", "bar"), false, HttpPostRequestEncoder.EncoderMode.RFC1738);
6867
upload.endHandler(v -> {
6968
assertEquals("foo=bar", result.toString());
@@ -75,24 +74,45 @@ public void testSimpleAttribute(TestContext ctx) throws Exception {
7574
}
7675

7776
@Test
78-
public void testFileUpload(TestContext ctx) throws Exception {
79-
testFileUpload(ctx, false);
77+
public void testFileUploadEventLoopContext(TestContext ctx) throws Exception {
78+
testFileUpload(ctx, vertx.createEventLoopContext(), false);
8079
}
8180

8281
@Test
83-
public void testFileUploadPaused(TestContext ctx) throws Exception {
84-
testFileUpload(ctx, true);
82+
public void testFileUploadWorkerContext(TestContext ctx) throws Exception {
83+
testFileUpload(ctx, vertx.createWorkerContext(), false);
8584
}
8685

87-
private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
86+
@Test
87+
public void testFileUploadVirtualThreadContext(TestContext ctx) throws Exception {
88+
assumeTrue(vertx.isVirtualThreadAvailable());
89+
testFileUpload(ctx, vertx.createVirtualThreadContext(), false);
90+
}
91+
92+
@Test
93+
public void testFileUploadPausedEventLoopContext(TestContext ctx) throws Exception {
94+
testFileUpload(ctx, vertx.createEventLoopContext(), true);
95+
}
96+
97+
@Test
98+
public void testFileUploadPausedWorkerContext(TestContext ctx) throws Exception {
99+
testFileUpload(ctx, vertx.createWorkerContext(), true);
100+
}
101+
102+
@Test
103+
public void testFileUploadPausedVirtualThreadContext(TestContext ctx) throws Exception {
104+
assumeTrue(vertx.isVirtualThreadAvailable());
105+
testFileUpload(ctx, vertx.createVirtualThreadContext(), true);
106+
}
107+
108+
private void testFileUpload(TestContext testContext, ContextInternal context, boolean paused) throws Exception {
88109
File file = testFolder.newFile();
89110
Files.write(file.toPath(), TestUtils.randomByteArray(32 * 1024));
90111

91112
String filename = file.getName();
92113
String pathname = file.getAbsolutePath();
93114

94-
Async async = ctx.async();
95-
Context context = vertx.getOrCreateContext();
115+
Async async = testContext.async();
96116
context.runOnContext(v1 -> {
97117
try {
98118
MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().textFileUpload(
@@ -104,7 +124,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
104124
AtomicInteger end = new AtomicInteger();
105125
upload.endHandler(v2 -> {
106126
assertEquals(0, end.getAndIncrement());
107-
ctx.assertTrue(buffers.size() > 0);
127+
testContext.assertFalse(buffers.isEmpty());
108128
async.complete();
109129
});
110130
upload.handler(buffer -> {
@@ -119,7 +139,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
119139
context.runOnContext(v3 -> upload.resume());
120140
}
121141
} catch (Exception e) {
122-
ctx.fail(e);
142+
testContext.fail(e);
123143
throw new AssertionError(e);
124144
}
125145
});

0 commit comments

Comments
 (0)