Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.NetSocket;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;

Expand Down Expand Up @@ -370,14 +371,20 @@ default Future<HttpClientResponse> send(Buffer body) {
* <p> If the {@link HttpHeaders#CONTENT_LENGTH} is set then the request assumes this is the
* length of the {stream}, otherwise the request will set a chunked {@link HttpHeaders#CONTENT_ENCODING}.
*
* <p>When the {@code body} stream fails, the request is reset with the CANCEL {@literal 0x8} error code.f</p>
*
* @return a future notified when the HTTP response is available
*/
default Future<HttpClientResponse> send(ReadStream<Buffer> body) {
MultiMap headers = headers();
if (headers == null || !headers.contains(HttpHeaders.CONTENT_LENGTH)) {
setChunked(true);
}
body.pipeTo(this);
Future<Void> result = body
.pipe()
.endOnFailure(false)
.to(this);
result.onFailure(err -> reset(0x8, err)); // CANCEL
return response();
}

Expand Down
66 changes: 66 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6048,6 +6048,72 @@ private void testClientRequestWithLargeBodyInSmallChunks(boolean chunked, BiFunc
await();
}

@Test
public void testClientRequestSendFailure() throws Exception {
waitFor(2);
CompletableFuture<Void> chunkContinuation = new CompletableFuture<>();
server.requestHandler(request -> {
request.handler(chunk -> {
chunkContinuation.complete(null);
});
request.exceptionHandler(err -> {
if (request.version() == HttpVersion.HTTP_2) {
assertEquals(StreamResetException.class, err.getClass());
StreamResetException sre = (StreamResetException) err;
assertEquals(0x8, sre.getCode());
} else {
assertEquals(HttpClosedException.class, err.getClass());
}
complete();
});
request.endHandler(v -> {
fail();
});
});
startServer(testAddress);
Throwable failure = new Throwable();
client.request(requestOptions).compose(request -> request.send(new ReadStream<>() {
Handler<Throwable> exceptionHandler;
@Override
public ReadStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public ReadStream<Buffer> handler(@Nullable Handler<Buffer> handler) {
if (handler != null) {
vertx.runOnContext(v1 -> {
handler.handle(Buffer.buffer("chunk-1"));
chunkContinuation.whenComplete((v2, err) -> {
exceptionHandler.handle(failure);
});
});
}
return this;
}
@Override
public ReadStream<Buffer> pause() {
return this;
}
@Override
public ReadStream<Buffer> resume() {
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
return this;
}
@Override
public ReadStream<Buffer> endHandler(@Nullable Handler<Void> endHandler) {
return this;
}
})).onComplete(onFailure(err -> {
assertSame(failure, err.getCause());
complete();
}));
await();
}

@Test
public void testClientRequestFlowControlDifferentEventLoops() throws Exception {
Promise<Void> resume = Promise.promise();
Expand Down
Loading