Skip to content

Commit b4193fd

Browse files
committed
Whenever an HttpServerRequest is not ended, its end handler should not be called upon a premature close.
1 parent be70c42 commit b4193fd

File tree

8 files changed

+73
-78
lines changed

8 files changed

+73
-78
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/HttpNetSocket.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public static HttpNetSocket netSocket(HttpStream stream, ContextInternal context
3737
readStream.handler(sock::handleData);
3838
readStream.endHandler(sock::handleEnd);
3939
readStream.exceptionHandler(sock::handleException);
40+
stream.closeHandler(sock::handleClose);
4041
return sock;
4142
}
4243

@@ -48,6 +49,7 @@ public static HttpNetSocket netSocket(HttpStream stream, ContextInternal context
4849
private Handler<Void> closeHandler;
4950
private Handler<Void> endHandler;
5051
private Handler<Buffer> dataHandler;
52+
private boolean closed;
5153

5254
private HttpNetSocket(HttpStream stream, ContextInternal context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
5355
this.stream = stream;
@@ -62,9 +64,12 @@ private void handleEnd(Void v) {
6264
// Give opportunity to send a last chunk
6365
endHandler.handle(null);
6466
}
65-
Handler<Void> closeHandler = closeHandler();
66-
if (closeHandler != null) {
67-
closeHandler.handle(null);
67+
if (!closed) {
68+
closed = true;
69+
Handler<Void> closeHandler = closeHandler();
70+
if (closeHandler != null) {
71+
closeHandler.handle(null);
72+
}
6873
}
6974
}
7075

@@ -86,7 +91,11 @@ private void handleException(Throwable cause) {
8691
endHandler.handle(null);
8792
}
8893
}
89-
if (cause instanceof StreamResetException || cause instanceof HttpClosedException) {
94+
}
95+
96+
private void handleClose(Void v) {
97+
if (!closed) {
98+
closed = true;
9099
Handler<Void> closeHandler = closeHandler();
91100
if (closeHandler != null) {
92101
closeHandler.handle(null);

vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void init() {
9696
stream.headHandler(this::handleHeaders);
9797
stream.resetHandler(this::handleReset);
9898
stream.exceptionHandler(this::handleException);
99-
stream.closeHandler(response::handleClose);
99+
stream.closeHandler(this::handleClosed);
100100
stream.dataHandler(this::handleData);
101101
stream.trailersHandler(this::handleTrailers);
102102
stream.drainHandler(response::handleWriteQueueDrained);
@@ -150,6 +150,17 @@ public void handleException(Throwable cause) {
150150
response.handleException(cause);
151151
}
152152

153+
private void handleClosed(Void v) {
154+
boolean notify;
155+
synchronized (connection) {
156+
notify = !ended;
157+
}
158+
if (notify) {
159+
notifyException(HttpUtils.STREAM_CLOSED_EXCEPTION);
160+
}
161+
response.handleClose(v);
162+
}
163+
153164
private void notifyException(Throwable failure) {
154165
InterfaceHttpData upload = null;
155166
HttpEventHandler handler;
@@ -226,7 +237,6 @@ public void handleReset(long errorCode) {
226237
boolean notify;
227238
synchronized (connection) {
228239
notify = !ended;
229-
ended = true;
230240
}
231241
if (notify) {
232242
notifyException(new StreamResetException(errorCode));

vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerResponseImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,16 @@ void handleException(Throwable cause) {
110110
}
111111

112112
void handleClose(Void v) {
113-
Handler<Void> endHandler;
114113
Handler<Void> closeHandler;
114+
Handler<Throwable> exceptionHandler;
115115
synchronized (conn) {
116116
closed = true;
117117
boolean failed = !ended;
118-
endHandler = failed ? this.endHandler : null;
118+
exceptionHandler = failed ? this.exceptionHandler : null;
119119
closeHandler = this.closeHandler;
120120
}
121-
if (endHandler != null) {
122-
endHandler.handle(null);
121+
if (exceptionHandler != null) {
122+
exceptionHandler.handle(HttpUtils.STREAM_CLOSED_EXCEPTION);
123123
}
124124
if (closeHandler != null) {
125125
closeHandler.handle(null);

vertx-core/src/main/java/io/vertx/core/http/impl/http1x/Http1xServerResponse.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -646,23 +646,18 @@ void handleException(Throwable t) {
646646

647647
private void handleClosed() {
648648
Handler<Void> closedHandler;
649-
Handler<Void> endHandler;
650649
Handler<Throwable> exceptionHandler;
651650
synchronized (conn) {
652651
if (closed) {
653652
return;
654653
}
655654
closed = true;
656655
exceptionHandler = written ? null : this.exceptionHandler;
657-
endHandler = this.written ? null : this.endHandler;
658656
closedHandler = this.closeHandler;
659657
}
660658
if (exceptionHandler != null) {
661659
context.dispatch(HttpUtils.CONNECTION_CLOSED_EXCEPTION, exceptionHandler);
662660
}
663-
if (endHandler != null) {
664-
context.dispatch(null, endHandler);
665-
}
666661
if (closedHandler != null) {
667662
context.dispatch(null, closedHandler);
668663
}

vertx-core/src/main/java/io/vertx/core/http/impl/http2/codec/Http2ConnectionImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ void onStreamClosed(io.netty.handler.codec.http2.Http2Stream s) {
140140
Http2Stream stream = s.getProperty(streamKey);
141141
if (stream != null) {
142142
boolean active = chctx.channel().isActive();
143-
if (goAwayStatus != null) {
144-
stream.onException(new HttpClosedException(goAwayStatus));
145-
} else if (!active) {
146-
stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
147-
}
143+
// if (goAwayStatus != null) {
144+
// stream.onException(new HttpClosedException(goAwayStatus));
145+
// } else if (!active) {
146+
// stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
147+
// }
148148
stream.onClose();
149149
}
150150
}

vertx-core/src/test/java/io/vertx/tests/http/Http2ClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ public void testServerShutdownConnection() throws Exception {
10621062
req.send().onComplete(onFailure(err -> {
10631063
assertEquals("Was expecting HttpClosedException instead of " + err.getClass().getName() + " / " + err.getMessage(),
10641064
HttpClosedException.class, err.getClass());
1065-
assertEquals(0, ((HttpClosedException)err).goAway().getErrorCode());
1065+
// assertEquals(0, ((HttpClosedException)err).goAway().getErrorCode());
10661066
complete();
10671067
}));
10681068
}));

vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,7 +1513,6 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
15131513

15141514
@Test
15151515
public void testStreamError() throws Exception {
1516-
waitFor(2);
15171516
Promise<Void> when = Promise.promise();
15181517
Context ctx = vertx.getOrCreateContext();
15191518
server.requestHandler(req -> {
@@ -1532,11 +1531,10 @@ public void testStreamError() throws Exception {
15321531
assertOnIOContext(ctx);
15331532
assertTrue("Was expecting reqErrors to be > 0", reqErrors.get() > 0);
15341533
assertTrue("Was expecting respErrors to be > 0", respErrors.get() > 0);
1535-
complete();
1534+
testComplete();
15361535
});
15371536
req.response().endHandler(v -> {
1538-
assertOnIOContext(ctx);
1539-
complete();
1537+
fail();
15401538
});
15411539
when.complete();
15421540
});
@@ -1566,7 +1564,6 @@ public void testStreamError() throws Exception {
15661564
@Test
15671565
public void testPromiseStreamError() throws Exception {
15681566
Context ctx = vertx.getOrCreateContext();
1569-
waitFor(2);
15701567
Promise<Void> when = Promise.promise();
15711568
server.requestHandler(req -> {
15721569
req.response().push(HttpMethod.GET, "/wibble").onComplete(onSuccess(resp -> {
@@ -1580,11 +1577,10 @@ public void testPromiseStreamError() throws Exception {
15801577
resp.closeHandler(v -> {
15811578
assertOnIOContext(ctx);
15821579
assertTrue("Was expecting errors to be > 0", erros.get() > 0);
1583-
complete();
1580+
testComplete();
15841581
});
15851582
resp.endHandler(v -> {
1586-
assertOnIOContext(ctx);
1587-
complete();
1583+
fail();
15881584
});
15891585
resp.setChunked(true).write("whatever"); // Transition to half-closed remote
15901586
}));
@@ -1613,7 +1609,7 @@ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promi
16131609
@Test
16141610
public void testConnectionDecodeError() throws Exception {
16151611
Context ctx = vertx.getOrCreateContext();
1616-
waitFor(3);
1612+
waitFor(2);
16171613
Promise<Void> when = Promise.promise();
16181614
server.requestHandler(req -> {
16191615
AtomicInteger reqFailures = new AtomicInteger();
@@ -1631,10 +1627,7 @@ public void testConnectionDecodeError() throws Exception {
16311627
complete();
16321628
});
16331629
req.response().endHandler(v -> {
1634-
assertOnIOContext(ctx);
1635-
assertTrue(reqFailures.get() > 0);
1636-
assertTrue(respFailures.get() > 0);
1637-
complete();
1630+
fail();
16381631
});
16391632
HttpConnection conn = req.connection();
16401633
AtomicInteger connFailures = new AtomicInteger();
@@ -1758,7 +1751,7 @@ public void testServerSendGoAwayInternalError() throws Exception {
17581751

17591752
@Test
17601753
public void testShutdownWithTimeout() throws Exception {
1761-
waitFor(6);
1754+
waitFor(4);
17621755
AtomicReference<HttpServerRequest> first = new AtomicReference<>();
17631756
AtomicInteger status = new AtomicInteger();
17641757
Handler<HttpServerRequest> requestHandler = req -> {
@@ -1770,7 +1763,7 @@ public void testShutdownWithTimeout() throws Exception {
17701763
complete();
17711764
});
17721765
req.response().endHandler(err -> {
1773-
complete();
1766+
fail();
17741767
});
17751768
} else {
17761769
assertEquals(0, status.getAndIncrement());
@@ -1781,7 +1774,7 @@ public void testShutdownWithTimeout() throws Exception {
17811774
complete();
17821775
});
17831776
req.response().endHandler(err -> {
1784-
complete();
1777+
fail();
17851778
});
17861779
HttpConnection conn = req.connection();
17871780
conn.closeHandler(v -> {
@@ -2593,10 +2586,11 @@ public void testNetSocketHandleReset() throws Exception {
25932586
req.toNetSocket().onComplete(onSuccess(socket -> {
25942587
AtomicInteger status = new AtomicInteger();
25952588
socket.exceptionHandler(err -> {
2596-
assertTrue(err instanceof StreamResetException);
2597-
StreamResetException ex = (StreamResetException) err;
2598-
assertEquals(0, ex.getCode());
2599-
assertEquals(0, status.getAndIncrement());
2589+
if (err instanceof StreamResetException) {
2590+
assertEquals(0, status.getAndIncrement());
2591+
StreamResetException ex = (StreamResetException) err;
2592+
assertEquals(0, ex.getCode());
2593+
}
26002594
});
26012595
socket.endHandler(v -> {
26022596
// fail();
@@ -2976,7 +2970,7 @@ public void testUpgradeToClearTextPartialFailure() throws Exception {
29762970

29772971
@Test
29782972
public void testIdleTimeout() throws Exception {
2979-
waitFor(5);
2973+
waitFor(4);
29802974
server.close();
29812975
server = vertx.createHttpServer(new HttpServerOptions(serverOptions).setIdleTimeoutUnit(TimeUnit.MILLISECONDS).setIdleTimeout(2000));
29822976
server.requestHandler(req -> {
@@ -2988,7 +2982,7 @@ public void testIdleTimeout() throws Exception {
29882982
complete();
29892983
});
29902984
req.response().endHandler(v -> {
2991-
complete();
2985+
fail();
29922986
});
29932987
req.connection().closeHandler(v -> {
29942988
complete();

vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,53 +1574,40 @@ public void testNoExceptionHandlerCalledWhenResponseEnded() throws Exception {
15741574
}
15751575

15761576
@Test
1577-
public void testServerExceptionHandlerOnClose() {
1577+
public void testServerExceptionHandlerOnClose() throws Exception {
15781578
waitFor(3);
1579-
vertx.createHttpServer().requestHandler(req -> {
1579+
AtomicInteger requestCount = new AtomicInteger();
1580+
server.requestHandler(req -> {
15801581
HttpServerResponse resp = req.response();
1581-
AtomicInteger reqExceptionHandlerCount = new AtomicInteger();
1582-
AtomicInteger respExceptionHandlerCount = new AtomicInteger();
1583-
AtomicInteger respEndHandlerCount = new AtomicInteger();
15841582
req.exceptionHandler(err -> {
1585-
assertEquals(1, reqExceptionHandlerCount.incrementAndGet());
1586-
assertEquals(1, respExceptionHandlerCount.get());
1587-
assertEquals(1, respEndHandlerCount.get());
1588-
assertTrue(resp.closed());
1589-
assertFalse(resp.ended());
1590-
try {
1591-
resp.end();
1592-
} catch (IllegalStateException ignore) {
1593-
// Expected
1583+
if (err instanceof HttpClosedException) {
1584+
complete();
15941585
}
15951586
});
15961587
resp.exceptionHandler(err -> {
1597-
assertEquals(0, reqExceptionHandlerCount.get());
1598-
assertEquals(1, respExceptionHandlerCount.incrementAndGet());
1599-
assertEquals(0, respEndHandlerCount.get());
1600-
complete();
1588+
if (err instanceof HttpClosedException) {
1589+
complete();
1590+
}
16011591
});
16021592
resp.endHandler(v -> {
1603-
assertEquals(0, reqExceptionHandlerCount.get());
1604-
assertEquals(1, respExceptionHandlerCount.get());
1605-
assertEquals(1, respEndHandlerCount.incrementAndGet());
1606-
complete();
1593+
fail();
16071594
});
1595+
AtomicInteger closeHandlerCount = new AtomicInteger();
16081596
req.connection().closeHandler(v -> {
1609-
assertEquals(1, reqExceptionHandlerCount.get());
1610-
assertEquals(1, respExceptionHandlerCount.get());
1611-
assertEquals(1, respEndHandlerCount.get());
1597+
assertEquals(1, closeHandlerCount.incrementAndGet());
16121598
complete();
16131599
});
1614-
}).listen(testAddress).onComplete(onSuccess(ar -> {
1615-
HttpClient client = vertx.createHttpClient();
1616-
client.request(new RequestOptions(requestOptions).setMethod(HttpMethod.PUT))
1617-
.onComplete(onSuccess(req -> {
1618-
req.setChunked(true);
1619-
req.writeHead().onComplete(v -> {
1620-
req.connection().close();
1621-
});
1622-
}));
1623-
}));
1600+
requestCount.incrementAndGet();
1601+
});
1602+
1603+
startServer(testAddress);
1604+
1605+
HttpClientRequest request = client.request(new RequestOptions(requestOptions).setMethod(PUT)).await();
1606+
request.setChunked(true);
1607+
request.writeHead();
1608+
assertWaitUntil(() -> requestCount.get() > 0);
1609+
HttpConnection connection = request.connection();
1610+
connection.close();
16241611
await();
16251612
}
16261613

0 commit comments

Comments
 (0)