From c368101ba2c1e96f29ca187930da10e62fea8627 Mon Sep 17 00:00:00 2001 From: Justin Guerra Date: Tue, 20 Aug 2024 15:07:20 -0600 Subject: [PATCH] Extract a method to allow easier injecting of custom shutdown behavior (#1809) --- .../server/ClientConnectionsShutdown.java | 23 +++++++++++++------ .../server/ClientConnectionsShutdownTest.java | 21 +++++++++++++++-- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java b/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java index d651b9b4ab..6207f4ff33 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java @@ -52,6 +52,11 @@ public class ClientConnectionsShutdown { private static final DynamicIntProperty GRACEFUL_CLOSE_TIMEOUT = new DynamicIntProperty("server.outofservice.close.timeout", 30); + public enum ShutdownType { + OUT_OF_SERVICE, + SHUTDOWN + } + private final ChannelGroup channels; private final EventExecutor executor; private final EurekaClient discoveryClient; @@ -81,7 +86,7 @@ private void initDiscoveryListener() { // Schedule to gracefully close all the client connections. if (ENABLED.get()) { executor.schedule( - () -> gracefullyShutdownClientChannels(false), + () -> gracefullyShutdownClientChannels(ShutdownType.OUT_OF_SERVICE), DELAY_AFTER_OUT_OF_SERVICE_MS.get(), TimeUnit.MILLISECONDS); } @@ -91,10 +96,10 @@ private void initDiscoveryListener() { } public Promise gracefullyShutdownClientChannels() { - return gracefullyShutdownClientChannels(true); + return gracefullyShutdownClientChannels(ShutdownType.SHUTDOWN); } - Promise gracefullyShutdownClientChannels(boolean forceCloseAfterTimeout) { + Promise gracefullyShutdownClientChannels(ShutdownType shutdownType) { // Mark all active connections to be closed after next response sent. LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size()); @@ -102,14 +107,12 @@ Promise gracefullyShutdownClientChannels(boolean forceCloseAfterTimeout) { // be closed during the force close stage ChannelGroupFuture closeFuture = channels.newCloseFuture(); for (Channel channel : channels) { - ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL); - ChannelPromise closePromise = channel.pipeline().newPromise(); - channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise); + flagChannelForClose(channel, shutdownType); } Promise promise = executor.newPromise(); Runnable cancelTimeoutTask; - if (forceCloseAfterTimeout) { + if (shutdownType == ShutdownType.SHUTDOWN) { ScheduledFuture timeoutTask = executor.schedule( () -> { LOG.warn("Force closing remaining {} active client channels.", channels.size()); @@ -141,4 +144,10 @@ Promise gracefullyShutdownClientChannels(boolean forceCloseAfterTimeout) { return promise; } + + protected void flagChannelForClose(Channel channel, ShutdownType shutdownType) { + ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL); + ChannelPromise closePromise = channel.pipeline().newPromise(); + channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise); + } } diff --git a/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java b/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java index 28f6df54a5..1004c8c31a 100644 --- a/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java +++ b/zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java @@ -18,9 +18,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -31,6 +33,7 @@ import com.netflix.discovery.EurekaClient; import com.netflix.discovery.EurekaEventListener; import com.netflix.discovery.StatusChangeEvent; +import com.netflix.zuul.netty.server.ClientConnectionsShutdown.ShutdownType; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -225,7 +228,7 @@ void connectionsNotForceClosed() throws Exception { try { configuration.setProperty(configName, "0"); createChannels(10); - Promise promise = shutdown.gracefullyShutdownClientChannels(false); + Promise promise = shutdown.gracefullyShutdownClientChannels(ShutdownType.OUT_OF_SERVICE); verify(eventLoop, never()).schedule(isA(Runnable.class), anyLong(), isA(TimeUnit.class)); channels.forEach(Channel::close); @@ -236,8 +239,22 @@ void connectionsNotForceClosed() throws Exception { } } + @Test + public void shutdownTypeForwardedToFlag() throws InterruptedException { + shutdown = spy(shutdown); + doNothing().when(shutdown).flagChannelForClose(any(), any()); + createChannels(1); + Channel channel = channels.iterator().next(); + for (ShutdownType type : ShutdownType.values()) { + shutdown.gracefullyShutdownClientChannels(type); + verify(shutdown).flagChannelForClose(channel, type); + } + + channels.close().await(5, TimeUnit.SECONDS); + } + private void createChannels(int numChannels) throws InterruptedException { - ChannelInitializer initializer = new ChannelInitializer() { + ChannelInitializer initializer = new ChannelInitializer<>() { @Override protected void initChannel(LocalChannel ch) {} };