Skip to content

Commit

Permalink
WebClient failure when sending multipart formdata from a virtual thread
Browse files Browse the repository at this point in the history
MultipartFormUpload is created on the Vert.x context that is bound to the request promise.

We should create the queue using the context executor for both the producer and consumer sides.

Otherwise, if there is a large upload and the queue needs to be paused/resumed, the pump method will be invoked on the EventLoop executor bound to this context. And then the IllegalArgumentException is thrown.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont committed Jan 16, 2025
1 parent 1cd9a5c commit 25e405f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.*;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -56,13 +55,13 @@ public class MultipartFormUpload implements ReadStream<Buffer> {
private boolean ended;
private final ContextInternal context;

public MultipartFormUpload(Context context,
public MultipartFormUpload(ContextInternal context,
MultipartForm parts,
boolean multipart,
HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
this.context = (ContextInternal) context;
this.context = context;
this.writable = true;
this.pending = new InboundMessageQueue<>(((ContextInternal) context).eventLoop(), ((ContextInternal) context).executor()) {
this.pending = new InboundMessageQueue<>(context.executor(), context.executor()) {
@Override
protected void handleResume() {
writable = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@
package io.vertx.ext.web.client.tests;

import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.ext.web.client.impl.MultipartFormUpload;
import io.vertx.ext.web.multipart.MultipartForm;
import io.vertx.test.core.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

Expand All @@ -40,18 +38,19 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;

@RunWith(VertxUnitRunner.class)
public class MultipartFormUploadTest {

@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();

private Vertx vertx;
private VertxInternal vertx;

@Before
public void setUp() throws Exception {
vertx = Vertx.vertx();
vertx = (VertxInternal) Vertx.vertx();
}

@After
Expand All @@ -63,7 +62,7 @@ public void tearDown(TestContext ctx) {
public void testSimpleAttribute(TestContext ctx) throws Exception {
Async async = ctx.async();
Buffer result = Buffer.buffer();
Context context = vertx.getOrCreateContext();
ContextInternal context = vertx.getOrCreateContext();
MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().attribute("foo", "bar"), false, HttpPostRequestEncoder.EncoderMode.RFC1738);
upload.endHandler(v -> {
assertEquals("foo=bar", result.toString());
Expand All @@ -75,24 +74,45 @@ public void testSimpleAttribute(TestContext ctx) throws Exception {
}

@Test
public void testFileUpload(TestContext ctx) throws Exception {
testFileUpload(ctx, false);
public void testFileUploadEventLoopContext(TestContext ctx) throws Exception {
testFileUpload(ctx, vertx.createEventLoopContext(), false);
}

@Test
public void testFileUploadPaused(TestContext ctx) throws Exception {
testFileUpload(ctx, true);
public void testFileUploadWorkerContext(TestContext ctx) throws Exception {
testFileUpload(ctx, vertx.createWorkerContext(), false);
}

private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
@Test
public void testFileUploadVirtualThreadContext(TestContext ctx) throws Exception {
assumeTrue(vertx.isVirtualThreadAvailable());
testFileUpload(ctx, vertx.createVirtualThreadContext(), false);
}

@Test
public void testFileUploadPausedEventLoopContext(TestContext ctx) throws Exception {
testFileUpload(ctx, vertx.createEventLoopContext(), true);
}

@Test
public void testFileUploadPausedWorkerContext(TestContext ctx) throws Exception {
testFileUpload(ctx, vertx.createWorkerContext(), true);
}

@Test
public void testFileUploadPausedVirtualThreadContext(TestContext ctx) throws Exception {
assumeTrue(vertx.isVirtualThreadAvailable());
testFileUpload(ctx, vertx.createVirtualThreadContext(), true);
}

private void testFileUpload(TestContext testContext, ContextInternal context, boolean paused) throws Exception {
File file = testFolder.newFile();
Files.write(file.toPath(), TestUtils.randomByteArray(32 * 1024));

String filename = file.getName();
String pathname = file.getAbsolutePath();

Async async = ctx.async();
Context context = vertx.getOrCreateContext();
Async async = testContext.async();
context.runOnContext(v1 -> {
try {
MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().textFileUpload(
Expand All @@ -104,7 +124,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
AtomicInteger end = new AtomicInteger();
upload.endHandler(v2 -> {
assertEquals(0, end.getAndIncrement());
ctx.assertTrue(buffers.size() > 0);
testContext.assertFalse(buffers.isEmpty());
async.complete();
});
upload.handler(buffer -> {
Expand All @@ -119,7 +139,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
context.runOnContext(v3 -> upload.resume());
}
} catch (Exception e) {
ctx.fail(e);
testContext.fail(e);
throw new AssertionError(e);
}
});
Expand Down

0 comments on commit 25e405f

Please sign in to comment.