diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 8ba55e230b7..e36e9a82873 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -317,7 +317,11 @@ private void endRequest(Stream s) { * @param stream to reset * @return whether the stream should be considered as closed */ - private boolean reset(Stream stream) { + private Boolean reset(Stream stream) { + if (stream.reset) { + return null; + } + stream.reset = true; if (!responses.contains(stream)) { requests.remove(stream); return true; @@ -326,16 +330,20 @@ private boolean reset(Stream stream) { return false; } - private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal handler) { + private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal listener) { writeToChannel(new MessageWrite() { @Override public void write() { + if (stream.reset) { + listener.fail("Stream reset"); + return; + } stream.request = request; - beginRequest(stream, request, chunked, buf, end, connect, handler); + beginRequest(stream, request, chunked, buf, end, connect, listener); } @Override public void cancel(Throwable cause) { - handler.fail(cause); + listener.fail(cause); } }); } @@ -344,6 +352,10 @@ private void writeBuffer(Stream stream, ByteBuf buff, boolean end, PromiseIntern writeToChannel(new MessageWrite() { @Override public void write() { + if (stream.reset) { + listener.fail("Stream reset"); + return; + } writeBuffer(stream, buff, end, (FutureListener)listener); } @@ -368,7 +380,7 @@ private abstract static class Stream { private boolean responseEnded; private long bytesRead; private long bytesWritten; - + private boolean reset; Stream(ContextInternal context, Promise promise, int id) { this.context = context; @@ -404,7 +416,6 @@ private static class StreamImpl extends Stream implements HttpClientStream { private final Http1xClientConnection conn; private final InboundMessageQueue queue; - private boolean reset; private boolean closed; private Handler headHandler; private Handler chunkHandler; @@ -431,18 +442,16 @@ protected void handlePause() { } @Override protected void handleMessage(Object item) { - if (!reset) { - if (item instanceof MultiMap) { - Handler handler = endHandler; - if (handler != null) { - context.dispatch((MultiMap) item, handler); - } - } else { - Buffer buffer = (Buffer) item; - Handler handler = chunkHandler; - if (handler != null) { - context.dispatch(buffer, handler); - } + if (item instanceof MultiMap) { + Handler handler = endHandler; + if (handler != null) { + context.dispatch((MultiMap) item, handler); + } + } else { + Buffer buffer = (Buffer) item; + Handler handler = chunkHandler; + if (handler != null) { + context.dispatch(buffer, handler); } } } @@ -584,26 +593,24 @@ public Future reset(Throwable cause) { Promise promise = context.promise(); EventLoop eventLoop = conn.context.nettyEventLoop(); if (eventLoop.inEventLoop()) { - _reset(cause, promise); + reset(cause, promise); } else { - eventLoop.execute(() -> _reset(cause, promise)); + eventLoop.execute(() -> reset(cause, promise)); } return promise.future(); } - private void _reset(Throwable cause, Promise promise) { - if (reset) { + private void reset(Throwable cause, Promise promise) { + Boolean removed = conn.reset(this); + if (removed == null) { promise.fail("Stream already reset"); - return; - } - reset = true; - boolean removed = conn.reset(this); - if (removed) { - context.execute(cause, this::handleClosed); } else { - context.execute(cause, this::handleException); - } - promise.complete(); + if (removed) { + context.execute(cause, this::handleClosed); + } else { + context.execute(cause, this::handleException); + } + promise.complete(); } } @Override @@ -856,7 +863,9 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) { Buffer buff = BufferInternal.safeBuffer(chunk); int len = buff.length(); stream.bytesRead += len; - stream.handleChunk(buff); + if (!stream.reset) { + stream.handleChunk(buff); + } } private void handleResponseEnd(Stream stream, LastHttpContent trailer) { @@ -908,7 +917,9 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) { checkLifecycle(); } lastResponseReceivedTimestamp = System.currentTimeMillis(); - stream.handleEnd(trailer); + if (!stream.reset) { + stream.handleEnd(trailer); + } if (stream.requestEnded) { stream.handleClosed(null); } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 219c9bb4806..c3f6c1da462 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -53,6 +53,7 @@ abstract class VertxHttp2Stream { private long bytesWritten; protected boolean isConnect; private Throwable failure; + private long reset = -1L; VertxHttp2Stream(C conn, ContextInternal context) { this.conn = conn; @@ -126,6 +127,7 @@ void onException(Throwable cause) { } void onReset(long code) { + reset = code; context.emit(code, this::handleReset); } @@ -246,6 +248,12 @@ public void cancel(Throwable cause) { } void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise promise) { + if (reset != -1L) { + if (promise != null) { + promise.fail("Stream reset"); + } + return; + } if (end) { endWritten(); } @@ -273,6 +281,10 @@ public void cancel(Throwable cause) { } void doWriteData(ByteBuf buf, boolean end, Promise promise) { + if (reset != -1L) { + promise.fail("Stream reset"); + return; + } ByteBuf chunk; if (buf == null && end) { chunk = Unpooled.EMPTY_BUFFER; @@ -289,6 +301,9 @@ void doWriteData(ByteBuf buf, boolean end, Promise promise) { } final Future writeReset(long code) { + if (code < 0L) { + throw new IllegalArgumentException("Invalid reset code value"); + } Promise promise = context.promise(); EventLoop eventLoop = conn.context().nettyEventLoop(); if (eventLoop.inEventLoop()) { @@ -300,6 +315,11 @@ final Future writeReset(long code) { } protected void doWriteReset(long code, Promise promise) { + if (reset != -1L) { + promise.fail("Stream already reset"); + return; + } + reset = code; int streamId; synchronized (this) { streamId = stream != null ? stream.id() : -1; diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index 0b24f6194b9..5d4ecd1a13b 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -5863,6 +5863,20 @@ public void testResetClientRequestResponseInProgress() throws Exception { await(); } + @Test + public void testResetPartialClientRequest() throws Exception { + server.requestHandler(req -> { + }); + startServer(testAddress); + client.request(requestOptions).onComplete(onSuccess(req -> { + assertTrue(req.reset().succeeded()); + req.end("body").onComplete(onFailure(err -> { + testComplete(); + })); + })); + await(); + } + @Test public void testSimpleCookie() throws Exception { testCookies("foo=bar", req -> {