Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static HttpNetSocket netSocket(HttpStream stream, ContextInternal context
readStream.handler(sock::handleData);
readStream.endHandler(sock::handleEnd);
readStream.exceptionHandler(sock::handleException);
stream.closeHandler(sock::handleClose);
return sock;
}

Expand All @@ -48,6 +49,7 @@ public static HttpNetSocket netSocket(HttpStream stream, ContextInternal context
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
private Handler<Buffer> dataHandler;
private boolean closed;

private HttpNetSocket(HttpStream stream, ContextInternal context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
this.stream = stream;
Expand All @@ -62,9 +64,12 @@ private void handleEnd(Void v) {
// Give opportunity to send a last chunk
endHandler.handle(null);
}
Handler<Void> closeHandler = closeHandler();
if (closeHandler != null) {
closeHandler.handle(null);
if (!closed) {
closed = true;
Handler<Void> closeHandler = closeHandler();
if (closeHandler != null) {
closeHandler.handle(null);
}
}
}

Expand All @@ -86,7 +91,11 @@ private void handleException(Throwable cause) {
endHandler.handle(null);
}
}
if (cause instanceof StreamResetException || cause instanceof HttpClosedException) {
}

private void handleClose(Void v) {
if (!closed) {
closed = true;
Handler<Void> closeHandler = closeHandler();
if (closeHandler != null) {
closeHandler.handle(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void init() {
stream.headHandler(this::handleHeaders);
stream.resetHandler(this::handleReset);
stream.exceptionHandler(this::handleException);
stream.closeHandler(response::handleClose);
stream.closeHandler(this::handleClosed);
stream.dataHandler(this::handleData);
stream.trailersHandler(this::handleTrailers);
stream.drainHandler(response::handleWriteQueueDrained);
Expand Down Expand Up @@ -150,6 +150,17 @@ public void handleException(Throwable cause) {
response.handleException(cause);
}

private void handleClosed(Void v) {
boolean notify;
synchronized (connection) {
notify = !ended;
}
if (notify) {
notifyException(HttpUtils.STREAM_CLOSED_EXCEPTION);
}
response.handleClose(v);
}

private void notifyException(Throwable failure) {
InterfaceHttpData upload = null;
HttpEventHandler handler;
Expand Down Expand Up @@ -226,7 +237,6 @@ public void handleReset(long errorCode) {
boolean notify;
synchronized (connection) {
notify = !ended;
ended = true;
}
if (notify) {
notifyException(new StreamResetException(errorCode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ void handleException(Throwable cause) {
}

void handleClose(Void v) {
Handler<Void> endHandler;
Handler<Void> closeHandler;
Handler<Throwable> exceptionHandler;
synchronized (conn) {
closed = true;
boolean failed = !ended;
endHandler = failed ? this.endHandler : null;
exceptionHandler = failed ? this.exceptionHandler : null;
closeHandler = this.closeHandler;
}
if (endHandler != null) {
endHandler.handle(null);
if (exceptionHandler != null) {
exceptionHandler.handle(HttpUtils.STREAM_CLOSED_EXCEPTION);
}
if (closeHandler != null) {
closeHandler.handle(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,23 +646,18 @@ void handleException(Throwable t) {

private void handleClosed() {
Handler<Void> closedHandler;
Handler<Void> endHandler;
Handler<Throwable> exceptionHandler;
synchronized (conn) {
if (closed) {
return;
}
closed = true;
exceptionHandler = written ? null : this.exceptionHandler;
endHandler = this.written ? null : this.endHandler;
closedHandler = this.closeHandler;
}
if (exceptionHandler != null) {
context.dispatch(HttpUtils.CONNECTION_CLOSED_EXCEPTION, exceptionHandler);
}
if (endHandler != null) {
context.dispatch(null, endHandler);
}
if (closedHandler != null) {
context.dispatch(null, closedHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ void onStreamClosed(io.netty.handler.codec.http2.Http2Stream s) {
Http2Stream stream = s.getProperty(streamKey);
if (stream != null) {
boolean active = chctx.channel().isActive();
if (goAwayStatus != null) {
stream.onException(new HttpClosedException(goAwayStatus));
} else if (!active) {
stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
}
// if (goAwayStatus != null) {
// stream.onException(new HttpClosedException(goAwayStatus));
// } else if (!active) {
// stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
// }
stream.onClose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ public void testServerShutdownConnection() throws Exception {
req.send().onComplete(onFailure(err -> {
assertEquals("Was expecting HttpClosedException instead of " + err.getClass().getName() + " / " + err.getMessage(),
HttpClosedException.class, err.getClass());
assertEquals(0, ((HttpClosedException)err).goAway().getErrorCode());
// assertEquals(0, ((HttpClosedException)err).goAway().getErrorCode());
complete();
}));
}));
Expand Down
38 changes: 16 additions & 22 deletions vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,6 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int

@Test
public void testStreamError() throws Exception {
waitFor(2);
Promise<Void> when = Promise.promise();
Context ctx = vertx.getOrCreateContext();
server.requestHandler(req -> {
Expand All @@ -1532,11 +1531,10 @@ public void testStreamError() throws Exception {
assertOnIOContext(ctx);
assertTrue("Was expecting reqErrors to be > 0", reqErrors.get() > 0);
assertTrue("Was expecting respErrors to be > 0", respErrors.get() > 0);
complete();
testComplete();
});
req.response().endHandler(v -> {
assertOnIOContext(ctx);
complete();
fail();
});
when.complete();
});
Expand Down Expand Up @@ -1566,7 +1564,6 @@ public void testStreamError() throws Exception {
@Test
public void testPromiseStreamError() throws Exception {
Context ctx = vertx.getOrCreateContext();
waitFor(2);
Promise<Void> when = Promise.promise();
server.requestHandler(req -> {
req.response().push(HttpMethod.GET, "/wibble").onComplete(onSuccess(resp -> {
Expand All @@ -1580,11 +1577,10 @@ public void testPromiseStreamError() throws Exception {
resp.closeHandler(v -> {
assertOnIOContext(ctx);
assertTrue("Was expecting errors to be > 0", erros.get() > 0);
complete();
testComplete();
});
resp.endHandler(v -> {
assertOnIOContext(ctx);
complete();
fail();
});
resp.setChunked(true).write("whatever"); // Transition to half-closed remote
}));
Expand Down Expand Up @@ -1613,7 +1609,7 @@ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promi
@Test
public void testConnectionDecodeError() throws Exception {
Context ctx = vertx.getOrCreateContext();
waitFor(3);
waitFor(2);
Promise<Void> when = Promise.promise();
server.requestHandler(req -> {
AtomicInteger reqFailures = new AtomicInteger();
Expand All @@ -1631,10 +1627,7 @@ public void testConnectionDecodeError() throws Exception {
complete();
});
req.response().endHandler(v -> {
assertOnIOContext(ctx);
assertTrue(reqFailures.get() > 0);
assertTrue(respFailures.get() > 0);
complete();
fail();
});
HttpConnection conn = req.connection();
AtomicInteger connFailures = new AtomicInteger();
Expand Down Expand Up @@ -1758,7 +1751,7 @@ public void testServerSendGoAwayInternalError() throws Exception {

@Test
public void testShutdownWithTimeout() throws Exception {
waitFor(6);
waitFor(4);
AtomicReference<HttpServerRequest> first = new AtomicReference<>();
AtomicInteger status = new AtomicInteger();
Handler<HttpServerRequest> requestHandler = req -> {
Expand All @@ -1770,7 +1763,7 @@ public void testShutdownWithTimeout() throws Exception {
complete();
});
req.response().endHandler(err -> {
complete();
fail();
});
} else {
assertEquals(0, status.getAndIncrement());
Expand All @@ -1781,7 +1774,7 @@ public void testShutdownWithTimeout() throws Exception {
complete();
});
req.response().endHandler(err -> {
complete();
fail();
});
HttpConnection conn = req.connection();
conn.closeHandler(v -> {
Expand Down Expand Up @@ -2593,10 +2586,11 @@ public void testNetSocketHandleReset() throws Exception {
req.toNetSocket().onComplete(onSuccess(socket -> {
AtomicInteger status = new AtomicInteger();
socket.exceptionHandler(err -> {
assertTrue(err instanceof StreamResetException);
StreamResetException ex = (StreamResetException) err;
assertEquals(0, ex.getCode());
assertEquals(0, status.getAndIncrement());
if (err instanceof StreamResetException) {
assertEquals(0, status.getAndIncrement());
StreamResetException ex = (StreamResetException) err;
assertEquals(0, ex.getCode());
}
});
socket.endHandler(v -> {
// fail();
Expand Down Expand Up @@ -2976,7 +2970,7 @@ public void testUpgradeToClearTextPartialFailure() throws Exception {

@Test
public void testIdleTimeout() throws Exception {
waitFor(5);
waitFor(4);
server.close();
server = vertx.createHttpServer(new HttpServerOptions(serverOptions).setIdleTimeoutUnit(TimeUnit.MILLISECONDS).setIdleTimeout(2000));
server.requestHandler(req -> {
Expand All @@ -2988,7 +2982,7 @@ public void testIdleTimeout() throws Exception {
complete();
});
req.response().endHandler(v -> {
complete();
fail();
});
req.connection().closeHandler(v -> {
complete();
Expand Down
57 changes: 22 additions & 35 deletions vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1574,53 +1574,40 @@ public void testNoExceptionHandlerCalledWhenResponseEnded() throws Exception {
}

@Test
public void testServerExceptionHandlerOnClose() {
public void testServerExceptionHandlerOnClose() throws Exception {
waitFor(3);
vertx.createHttpServer().requestHandler(req -> {
AtomicInteger requestCount = new AtomicInteger();
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
AtomicInteger reqExceptionHandlerCount = new AtomicInteger();
AtomicInteger respExceptionHandlerCount = new AtomicInteger();
AtomicInteger respEndHandlerCount = new AtomicInteger();
req.exceptionHandler(err -> {
assertEquals(1, reqExceptionHandlerCount.incrementAndGet());
assertEquals(1, respExceptionHandlerCount.get());
assertEquals(1, respEndHandlerCount.get());
assertTrue(resp.closed());
assertFalse(resp.ended());
try {
resp.end();
} catch (IllegalStateException ignore) {
// Expected
if (err instanceof HttpClosedException) {
complete();
}
});
resp.exceptionHandler(err -> {
assertEquals(0, reqExceptionHandlerCount.get());
assertEquals(1, respExceptionHandlerCount.incrementAndGet());
assertEquals(0, respEndHandlerCount.get());
complete();
if (err instanceof HttpClosedException) {
complete();
}
});
resp.endHandler(v -> {
assertEquals(0, reqExceptionHandlerCount.get());
assertEquals(1, respExceptionHandlerCount.get());
assertEquals(1, respEndHandlerCount.incrementAndGet());
complete();
fail();
});
AtomicInteger closeHandlerCount = new AtomicInteger();
req.connection().closeHandler(v -> {
assertEquals(1, reqExceptionHandlerCount.get());
assertEquals(1, respExceptionHandlerCount.get());
assertEquals(1, respEndHandlerCount.get());
assertEquals(1, closeHandlerCount.incrementAndGet());
complete();
});
}).listen(testAddress).onComplete(onSuccess(ar -> {
HttpClient client = vertx.createHttpClient();
client.request(new RequestOptions(requestOptions).setMethod(HttpMethod.PUT))
.onComplete(onSuccess(req -> {
req.setChunked(true);
req.writeHead().onComplete(v -> {
req.connection().close();
});
}));
}));
requestCount.incrementAndGet();
});

startServer(testAddress);

HttpClientRequest request = client.request(new RequestOptions(requestOptions).setMethod(PUT)).await();
request.setChunked(true);
request.writeHead();
assertWaitUntil(() -> requestCount.get() > 0);
HttpConnection connection = request.connection();
connection.close();
await();
}

Expand Down
Loading