Skip to content

Commit c2b49a3

Browse files
committed
Add QuicStream#abort that sends a STOP_SENDING frame.
1 parent a2daeb7 commit c2b49a3

File tree

4 files changed

+111
-2
lines changed

4 files changed

+111
-2
lines changed

vertx-core/src/main/java/io/vertx/core/net/QuicStream.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,23 @@ public interface QuicStream extends Socket {
5858
QuicStream resetHandler(@Nullable Handler<Integer> handler);
5959

6060
/**
61-
* Send a stream reset frame to the remote stream.
61+
* Abruptly terminate the sending part of the stream with the specified application protocol {@code error} code
62+
* argument, a {@code RESET} frame is sent to the remote peer.
6263
*
63-
* @param error the error code
64+
* @param error the application protocol error code
6465
* @return a future completed when the reset frame has been sent
6566
*/
6667
Future<Void> reset(int error);
6768

69+
/**
70+
* Instruct the remote peer that this part of the stream is no longer interested in received data and
71+
* wants it to cease transmission, a {@code STOP_SENDING} frame is sent to the remote peer.
72+
*
73+
* @param error the error code
74+
* @return a future completed when the {@code STOP_SENDING} frame has been sent
75+
*/
76+
Future<Void> abort(int error);
77+
6878
/**
6979
* Set a handler called when the stream is closed.
7080
*

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicStreamImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public Future<Void> reset(int error) {
7272
return promise.future();
7373
}
7474

75+
@Override
76+
public Future<Void> abort(int error) {
77+
PromiseInternal<Void> promise = context.promise();
78+
ChannelFuture shutdownPromise = channel.shutdownInput(error);
79+
shutdownPromise.addListener(promise);
80+
return promise.future();
81+
}
82+
7583
@Override
7684
public NetworkMetrics<?> metrics() {
7785
return streamMetrics;

vertx-core/src/test/java/io/vertx/tests/net/quic/QuicServerTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
package io.vertx.tests.net.quic;
1212

1313
import io.netty.buffer.ByteBufUtil;
14+
import io.netty.buffer.Unpooled;
1415
import io.netty.channel.nio.NioEventLoopGroup;
16+
import io.netty.channel.socket.ChannelOutputShutdownException;
1517
import io.netty.handler.codec.quic.QuicClosedChannelException;
1618
import io.netty.util.NetUtil;
1719
import io.vertx.core.Context;
1820
import io.vertx.core.Future;
21+
import io.vertx.core.Promise;
1922
import io.vertx.core.Vertx;
2023
import io.vertx.core.buffer.Buffer;
2124
import io.vertx.core.internal.quic.QuicConnectionInternal;
@@ -40,6 +43,7 @@
4043
import java.security.KeyStore;
4144
import java.time.Duration;
4245
import java.util.*;
46+
import java.util.concurrent.CompletableFuture;
4347
import java.util.concurrent.CountDownLatch;
4448
import java.util.concurrent.TimeUnit;
4549
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,6 +69,10 @@ static QuicServerOptions serverOptions() {
6569
options.getTransportOptions().setInitialMaxStreamsBidirectional(100L);
6670
options.getTransportOptions().setInitialMaxStreamsUnidirectional(100L);
6771
options.getTransportOptions().setActiveMigration(true);
72+
73+
// options.setClientAddressValidation(QuicClientAddressValidation.NONE);
74+
// options.setKeyLogFile("/Users/julien/keylogfile.txt");
75+
6876
return options;
6977
}
7078

@@ -381,6 +389,82 @@ public void testClientReset() throws Exception {
381389
}
382390
}
383391

392+
@Test
393+
public void testClientAbortReading() throws Exception {
394+
waitFor(3);
395+
disableThreadChecks();
396+
QuicServer server = QuicServer.create(vertx, serverOptions());
397+
AtomicReference<QuicStream> streamRef = new AtomicReference<>();
398+
server.handler(conn -> {
399+
conn.streamHandler(stream -> {
400+
streamRef.set(stream);
401+
stream.handler(buff -> {
402+
stream.write(buff);
403+
});
404+
});
405+
});
406+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
407+
QuicTestClient client = new QuicTestClient(new NioEventLoopGroup(1));
408+
try {
409+
client = new QuicTestClient(new NioEventLoopGroup(1));
410+
QuicTestClient.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 9999));
411+
QuicTestClient.Stream stream = connection.newStream();
412+
stream.create();
413+
AtomicInteger received = new AtomicInteger();
414+
stream.handler(buff -> received.addAndGet(buff.length));
415+
stream.write("ping");
416+
assertWaitUntil(() -> received.get() > 0);
417+
stream.abort(10);
418+
try {
419+
streamRef.get().write("test").await();
420+
fail();
421+
} catch (Exception e) {
422+
assertEquals(ChannelOutputShutdownException.class, e.getClass());
423+
assertEquals("STOP_SENDING frame received", e.getMessage());
424+
}
425+
} finally {
426+
client.close();
427+
server.close().await();
428+
}
429+
}
430+
431+
@Test
432+
public void testServerAbortReading() throws Exception {
433+
waitFor(3);
434+
disableThreadChecks();
435+
QuicServer server = QuicServer.create(vertx, serverOptions());
436+
Promise<Void> writeLatch = Promise.promise();
437+
server.handler(conn -> {
438+
conn.streamHandler(stream -> {
439+
stream.handler(buff -> {
440+
stream
441+
.abort(10)
442+
.onComplete(writeLatch);
443+
});
444+
});
445+
});
446+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
447+
QuicTestClient client = new QuicTestClient(new NioEventLoopGroup(1));
448+
try {
449+
client = new QuicTestClient(new NioEventLoopGroup(1));
450+
QuicTestClient.Connection connection = client.connect(new InetSocketAddress(NetUtil.LOCALHOST4, 9999));
451+
QuicTestClient.Stream stream = connection.newStream();
452+
stream.create();
453+
stream.write("ping");
454+
writeLatch.future().await();
455+
try {
456+
stream.streamChannel.writeAndFlush(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8)).sync();
457+
fail();
458+
} catch (Exception e) {
459+
assertSame(ChannelOutputShutdownException.class, e.getClass());
460+
assertEquals("STOP_SENDING frame received", e.getMessage());
461+
}
462+
} finally {
463+
client.close();
464+
server.close().await();
465+
}
466+
}
467+
384468
@Test
385469
public void testServerReset() throws Exception {
386470
disableThreadChecks();

vertx-core/src/test/java/io/vertx/tests/net/quic/QuicTestClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ public void reset(int errorCode) throws Exception {
219219
streamChannel.shutdownOutput(errorCode).sync();
220220
}
221221

222+
public void abort(int errorCode) throws Exception {
223+
if (streamChannel == null) {
224+
throw new IllegalStateException();
225+
}
226+
streamChannel.shutdownInput(errorCode).sync();
227+
}
228+
222229
public Stream handler(Consumer<byte[]> handler) {
223230
this.handler = handler;
224231
return this;

0 commit comments

Comments
 (0)