From f4dfd01b55e40848dbb4232b6b0d1d70e4ba9fbe Mon Sep 17 00:00:00 2001
From: Julien Viet <julien@julienviet.com>
Date: Mon, 2 Dec 2024 10:25:01 +0100
Subject: [PATCH] HTTP client stream should handle a write to a stream which
 has been reset without having being allocated.

Motivation:

Vert.x HTTP client stream does not allocate a stream when the stream has been reset by the application before its allocation. When such stream is being written, the stream behaves normally and fails since the internal state is not correct.

Changes:

Record the reset state of a stream and guard against writes in such case.
---
 .../http/impl/Http1xClientConnection.java     | 77 +++++++++++--------
 .../core/http/impl/VertxHttp2Stream.java      | 20 +++++
 .../java/io/vertx/tests/http/HttpTest.java    | 14 ++++
 3 files changed, 78 insertions(+), 33 deletions(-)

diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
index 8ba55e230b7..e36e9a82873 100644
--- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
+++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
@@ -317,7 +317,11 @@ private void endRequest(Stream s) {
    * @param stream to reset
    * @return whether the stream should be considered as closed
    */
-  private boolean reset(Stream stream) {
+  private Boolean reset(Stream stream) {
+    if (stream.reset) {
+      return null;
+    }
+    stream.reset = true;
     if (!responses.contains(stream)) {
       requests.remove(stream);
       return true;
@@ -326,16 +330,20 @@ private boolean reset(Stream stream) {
     return false;
   }
 
-  private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> handler) {
+  private void writeHead(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, PromiseInternal<Void> listener) {
     writeToChannel(new MessageWrite() {
       @Override
       public void write() {
+        if (stream.reset) {
+          listener.fail("Stream reset");
+          return;
+        }
         stream.request = request;
-        beginRequest(stream, request, chunked, buf, end, connect, handler);
+        beginRequest(stream, request, chunked, buf, end, connect, listener);
       }
       @Override
       public void cancel(Throwable cause) {
-        handler.fail(cause);
+        listener.fail(cause);
       }
     });
   }
@@ -344,6 +352,10 @@ private void writeBuffer(Stream stream, ByteBuf buff, boolean end, PromiseIntern
     writeToChannel(new MessageWrite() {
       @Override
       public void write() {
+        if (stream.reset) {
+          listener.fail("Stream reset");
+          return;
+        }
         writeBuffer(stream, buff, end, (FutureListener<Void>)listener);
       }
 
@@ -368,7 +380,7 @@ private abstract static class Stream {
     private boolean responseEnded;
     private long bytesRead;
     private long bytesWritten;
-
+    private boolean reset;
 
     Stream(ContextInternal context, Promise<HttpClientStream> promise, int id) {
       this.context = context;
@@ -404,7 +416,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
 
     private final Http1xClientConnection conn;
     private final InboundMessageQueue<Object> queue;
-    private boolean reset;
     private boolean closed;
     private Handler<HttpResponseHead> headHandler;
     private Handler<Buffer> chunkHandler;
@@ -431,18 +442,16 @@ protected void handlePause() {
         }
         @Override
         protected void handleMessage(Object item) {
-          if (!reset) {
-            if (item instanceof MultiMap) {
-              Handler<MultiMap> handler = endHandler;
-              if (handler != null) {
-                context.dispatch((MultiMap) item, handler);
-              }
-            } else {
-              Buffer buffer = (Buffer) item;
-              Handler<Buffer> handler = chunkHandler;
-              if (handler != null) {
-                context.dispatch(buffer, handler);
-              }
+          if (item instanceof MultiMap) {
+            Handler<MultiMap> handler = endHandler;
+            if (handler != null) {
+              context.dispatch((MultiMap) item, handler);
+            }
+          } else {
+            Buffer buffer = (Buffer) item;
+            Handler<Buffer> handler = chunkHandler;
+            if (handler != null) {
+              context.dispatch(buffer, handler);
             }
           }
         }
@@ -584,26 +593,24 @@ public Future<Void> reset(Throwable cause) {
       Promise<Void> promise = context.promise();
       EventLoop eventLoop = conn.context.nettyEventLoop();
       if (eventLoop.inEventLoop()) {
-        _reset(cause, promise);
+        reset(cause, promise);
       } else {
-        eventLoop.execute(() -> _reset(cause, promise));
+        eventLoop.execute(() -> reset(cause, promise));
       }
       return promise.future();
     }
 
-    private void _reset(Throwable cause, Promise<Void> promise) {
-      if (reset) {
+    private void reset(Throwable cause, Promise<Void> promise) {
+      Boolean removed = conn.reset(this);
+      if (removed == null) {
         promise.fail("Stream already reset");
-        return;
-      }
-      reset = true;
-      boolean removed = conn.reset(this);
-      if (removed) {
-        context.execute(cause, this::handleClosed);
       } else {
-        context.execute(cause, this::handleException);
-      }
-      promise.complete();
+        if (removed) {
+          context.execute(cause, this::handleClosed);
+        } else {
+          context.execute(cause, this::handleException);
+        }
+        promise.complete();      }
     }
 
     @Override
@@ -856,7 +863,9 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {
     Buffer buff = BufferInternal.safeBuffer(chunk);
     int len = buff.length();
     stream.bytesRead += len;
-    stream.handleChunk(buff);
+    if (!stream.reset) {
+      stream.handleChunk(buff);
+    }
   }
 
   private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
@@ -908,7 +917,9 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
       checkLifecycle();
     }
     lastResponseReceivedTimestamp = System.currentTimeMillis();
-    stream.handleEnd(trailer);
+    if (!stream.reset) {
+      stream.handleEnd(trailer);
+    }
     if (stream.requestEnded) {
       stream.handleClosed(null);
     }
diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
index 219c9bb4806..c3f6c1da462 100644
--- a/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
+++ b/vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
@@ -53,6 +53,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
   private long bytesWritten;
   protected boolean isConnect;
   private Throwable failure;
+  private long reset = -1L;
 
   VertxHttp2Stream(C conn, ContextInternal context) {
     this.conn = conn;
@@ -126,6 +127,7 @@ void onException(Throwable cause) {
   }
 
   void onReset(long code) {
+    reset = code;
     context.emit(code, this::handleReset);
   }
 
@@ -246,6 +248,12 @@ public void cancel(Throwable cause) {
   }
 
   void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise<Void> promise) {
+    if (reset != -1L) {
+      if (promise != null) {
+        promise.fail("Stream reset");
+      }
+      return;
+    }
     if (end) {
       endWritten();
     }
@@ -273,6 +281,10 @@ public void cancel(Throwable cause) {
   }
 
   void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
+    if (reset != -1L) {
+      promise.fail("Stream reset");
+      return;
+    }
     ByteBuf chunk;
     if (buf == null && end) {
       chunk = Unpooled.EMPTY_BUFFER;
@@ -289,6 +301,9 @@ void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
   }
 
   final Future<Void> writeReset(long code) {
+    if (code < 0L) {
+      throw new IllegalArgumentException("Invalid reset code value");
+    }
     Promise<Void> promise = context.promise();
     EventLoop eventLoop = conn.context().nettyEventLoop();
     if (eventLoop.inEventLoop()) {
@@ -300,6 +315,11 @@ final Future<Void> writeReset(long code) {
   }
 
   protected void doWriteReset(long code, Promise<Void> promise) {
+    if (reset != -1L) {
+      promise.fail("Stream already reset");
+      return;
+    }
+    reset = code;
     int streamId;
     synchronized (this) {
       streamId = stream != null ? stream.id() : -1;
diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
index 0b24f6194b9..5d4ecd1a13b 100644
--- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
+++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
@@ -5863,6 +5863,20 @@ public void testResetClientRequestResponseInProgress() throws Exception {
     await();
   }
 
+  @Test
+  public void testResetPartialClientRequest() throws Exception {
+    server.requestHandler(req -> {
+    });
+    startServer(testAddress);
+    client.request(requestOptions).onComplete(onSuccess(req -> {
+      assertTrue(req.reset().succeeded());
+      req.end("body").onComplete(onFailure(err -> {
+        testComplete();
+      }));
+    }));
+    await();
+  }
+
   @Test
   public void testSimpleCookie() throws Exception {
     testCookies("foo=bar", req -> {