Skip to content

Commit

Permalink
HTTP client stream should handle a write to a stream which has been r…
Browse files Browse the repository at this point in the history
…eset without having being allocated.

Motivation:

Vert.x HTTP client stream does not allocate a stream when the stream has been reset by the application before its allocation. When such stream is being written, the stream behaves normally and fails since the internal state is not correct.

Changes:

Record the reset state of a stream and guard against writes in such case.
  • Loading branch information
vietj committed Dec 2, 2024
1 parent 001a07c commit f4dfd01
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ private void endRequest(Stream s) {
* @param stream to reset
* @return whether the stream should be considered as closed
*/
private boolean reset(Stream stream) {
private Boolean reset(Stream stream) {
if (stream.reset) {
return null;
}
stream.reset = true;
if (!responses.contains(stream)) {
requests.remove(stream);
return true;
Expand All @@ -326,16 +330,20 @@ private boolean reset(Stream stream) {
return false;
}

private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> handler) {
private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> listener) {
writeToChannel(new MessageWrite() {
@Override
public void write() {
if (stream.reset) {
listener.fail("Stream reset");
return;
}
stream.request = request;
beginRequest(stream, request, chunked, buf, end, connect, handler);
beginRequest(stream, request, chunked, buf, end, connect, listener);
}
@Override
public void cancel(Throwable cause) {
handler.fail(cause);
listener.fail(cause);
}
});
}
Expand All @@ -344,6 +352,10 @@ private void writeBuffer(Stream stream, ByteBuf buff, boolean end, PromiseIntern
writeToChannel(new MessageWrite() {
@Override
public void write() {
if (stream.reset) {
listener.fail("Stream reset");
return;
}
writeBuffer(stream, buff, end, (FutureListener<Void>)listener);
}

Expand All @@ -368,7 +380,7 @@ private abstract static class Stream {
private boolean responseEnded;
private long bytesRead;
private long bytesWritten;

private boolean reset;

Stream(ContextInternal context, Promise<HttpClientStream> promise, int id) {
this.context = context;
Expand Down Expand Up @@ -404,7 +416,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {

private final Http1xClientConnection conn;
private final InboundMessageQueue<Object> queue;
private boolean reset;
private boolean closed;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
Expand All @@ -431,18 +442,16 @@ protected void handlePause() {
}
@Override
protected void handleMessage(Object item) {
if (!reset) {
if (item instanceof MultiMap) {
Handler<MultiMap> handler = endHandler;
if (handler != null) {
context.dispatch((MultiMap) item, handler);
}
} else {
Buffer buffer = (Buffer) item;
Handler<Buffer> handler = chunkHandler;
if (handler != null) {
context.dispatch(buffer, handler);
}
if (item instanceof MultiMap) {
Handler<MultiMap> handler = endHandler;
if (handler != null) {
context.dispatch((MultiMap) item, handler);
}
} else {
Buffer buffer = (Buffer) item;
Handler<Buffer> handler = chunkHandler;
if (handler != null) {
context.dispatch(buffer, handler);
}
}
}
Expand Down Expand Up @@ -584,26 +593,24 @@ public Future<Void> reset(Throwable cause) {
Promise<Void> promise = context.promise();
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
_reset(cause, promise);
reset(cause, promise);
} else {
eventLoop.execute(() -> _reset(cause, promise));
eventLoop.execute(() -> reset(cause, promise));
}
return promise.future();
}

private void _reset(Throwable cause, Promise<Void> promise) {
if (reset) {
private void reset(Throwable cause, Promise<Void> promise) {
Boolean removed = conn.reset(this);
if (removed == null) {
promise.fail("Stream already reset");
return;
}
reset = true;
boolean removed = conn.reset(this);
if (removed) {
context.execute(cause, this::handleClosed);
} else {
context.execute(cause, this::handleException);
}
promise.complete();
if (removed) {
context.execute(cause, this::handleClosed);
} else {
context.execute(cause, this::handleException);
}
promise.complete(); }
}

@Override
Expand Down Expand Up @@ -856,7 +863,9 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {
Buffer buff = BufferInternal.safeBuffer(chunk);
int len = buff.length();
stream.bytesRead += len;
stream.handleChunk(buff);
if (!stream.reset) {
stream.handleChunk(buff);
}
}

private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
Expand Down Expand Up @@ -908,7 +917,9 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
checkLifecycle();
}
lastResponseReceivedTimestamp = System.currentTimeMillis();
stream.handleEnd(trailer);
if (!stream.reset) {
stream.handleEnd(trailer);
}
if (stream.requestEnded) {
stream.handleClosed(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
private long bytesWritten;
protected boolean isConnect;
private Throwable failure;
private long reset = -1L;

VertxHttp2Stream(C conn, ContextInternal context) {
this.conn = conn;
Expand Down Expand Up @@ -126,6 +127,7 @@ void onException(Throwable cause) {
}

void onReset(long code) {
reset = code;
context.emit(code, this::handleReset);
}

Expand Down Expand Up @@ -246,6 +248,12 @@ public void cancel(Throwable cause) {
}

void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise<Void> promise) {
if (reset != -1L) {
if (promise != null) {
promise.fail("Stream reset");
}
return;
}
if (end) {
endWritten();
}
Expand Down Expand Up @@ -273,6 +281,10 @@ public void cancel(Throwable cause) {
}

void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
if (reset != -1L) {
promise.fail("Stream reset");
return;
}
ByteBuf chunk;
if (buf == null && end) {
chunk = Unpooled.EMPTY_BUFFER;
Expand All @@ -289,6 +301,9 @@ void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
}

final Future<Void> writeReset(long code) {
if (code < 0L) {
throw new IllegalArgumentException("Invalid reset code value");
}
Promise<Void> promise = context.promise();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
Expand All @@ -300,6 +315,11 @@ final Future<Void> writeReset(long code) {
}

protected void doWriteReset(long code, Promise<Void> promise) {
if (reset != -1L) {
promise.fail("Stream already reset");
return;
}
reset = code;
int streamId;
synchronized (this) {
streamId = stream != null ? stream.id() : -1;
Expand Down
14 changes: 14 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5863,6 +5863,20 @@ public void testResetClientRequestResponseInProgress() throws Exception {
await();
}

@Test
public void testResetPartialClientRequest() throws Exception {
server.requestHandler(req -> {
});
startServer(testAddress);
client.request(requestOptions).onComplete(onSuccess(req -> {
assertTrue(req.reset().succeeded());
req.end("body").onComplete(onFailure(err -> {
testComplete();
}));
}));
await();
}

@Test
public void testSimpleCookie() throws Exception {
testCookies("foo=bar", req -> {
Expand Down

0 comments on commit f4dfd01

Please sign in to comment.