Skip to content

Commit c368101

Browse files
jguerraargha-c
authored andcommitted
Extract a method to allow easier injecting of custom shutdown behavior (#1809)
1 parent 15cbe62 commit c368101

File tree

2 files changed

+35
-9
lines changed

2 files changed

+35
-9
lines changed

zuul-core/src/main/java/com/netflix/zuul/netty/server/ClientConnectionsShutdown.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public class ClientConnectionsShutdown {
5252
private static final DynamicIntProperty GRACEFUL_CLOSE_TIMEOUT =
5353
new DynamicIntProperty("server.outofservice.close.timeout", 30);
5454

55+
public enum ShutdownType {
56+
OUT_OF_SERVICE,
57+
SHUTDOWN
58+
}
59+
5560
private final ChannelGroup channels;
5661
private final EventExecutor executor;
5762
private final EurekaClient discoveryClient;
@@ -81,7 +86,7 @@ private void initDiscoveryListener() {
8186
// Schedule to gracefully close all the client connections.
8287
if (ENABLED.get()) {
8388
executor.schedule(
84-
() -> gracefullyShutdownClientChannels(false),
89+
() -> gracefullyShutdownClientChannels(ShutdownType.OUT_OF_SERVICE),
8590
DELAY_AFTER_OUT_OF_SERVICE_MS.get(),
8691
TimeUnit.MILLISECONDS);
8792
}
@@ -91,25 +96,23 @@ private void initDiscoveryListener() {
9196
}
9297

9398
public Promise<Void> gracefullyShutdownClientChannels() {
94-
return gracefullyShutdownClientChannels(true);
99+
return gracefullyShutdownClientChannels(ShutdownType.SHUTDOWN);
95100
}
96101

97-
Promise<Void> gracefullyShutdownClientChannels(boolean forceCloseAfterTimeout) {
102+
Promise<Void> gracefullyShutdownClientChannels(ShutdownType shutdownType) {
98103
// Mark all active connections to be closed after next response sent.
99104
LOG.warn("Flagging CLOSE_AFTER_RESPONSE on {} client channels.", channels.size());
100105

101106
// racy situation if new connections are still coming in, but any channels created after newCloseFuture will
102107
// be closed during the force close stage
103108
ChannelGroupFuture closeFuture = channels.newCloseFuture();
104109
for (Channel channel : channels) {
105-
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);
106-
ChannelPromise closePromise = channel.pipeline().newPromise();
107-
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
110+
flagChannelForClose(channel, shutdownType);
108111
}
109112

110113
Promise<Void> promise = executor.newPromise();
111114
Runnable cancelTimeoutTask;
112-
if (forceCloseAfterTimeout) {
115+
if (shutdownType == ShutdownType.SHUTDOWN) {
113116
ScheduledFuture<?> timeoutTask = executor.schedule(
114117
() -> {
115118
LOG.warn("Force closing remaining {} active client channels.", channels.size());
@@ -141,4 +144,10 @@ Promise<Void> gracefullyShutdownClientChannels(boolean forceCloseAfterTimeout) {
141144

142145
return promise;
143146
}
147+
148+
protected void flagChannelForClose(Channel channel, ShutdownType shutdownType) {
149+
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);
150+
ChannelPromise closePromise = channel.pipeline().newPromise();
151+
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
152+
}
144153
}

zuul-core/src/test/java/com/netflix/zuul/netty/server/ClientConnectionsShutdownTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.anyLong;
2223
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.ArgumentMatchers.isA;
25+
import static org.mockito.Mockito.doNothing;
2426
import static org.mockito.Mockito.doReturn;
2527
import static org.mockito.Mockito.never;
2628
import static org.mockito.Mockito.spy;
@@ -31,6 +33,7 @@
3133
import com.netflix.discovery.EurekaClient;
3234
import com.netflix.discovery.EurekaEventListener;
3335
import com.netflix.discovery.StatusChangeEvent;
36+
import com.netflix.zuul.netty.server.ClientConnectionsShutdown.ShutdownType;
3437
import io.netty.bootstrap.Bootstrap;
3538
import io.netty.bootstrap.ServerBootstrap;
3639
import io.netty.channel.Channel;
@@ -225,7 +228,7 @@ void connectionsNotForceClosed() throws Exception {
225228
try {
226229
configuration.setProperty(configName, "0");
227230
createChannels(10);
228-
Promise<Void> promise = shutdown.gracefullyShutdownClientChannels(false);
231+
Promise<Void> promise = shutdown.gracefullyShutdownClientChannels(ShutdownType.OUT_OF_SERVICE);
229232
verify(eventLoop, never()).schedule(isA(Runnable.class), anyLong(), isA(TimeUnit.class));
230233
channels.forEach(Channel::close);
231234

@@ -236,8 +239,22 @@ void connectionsNotForceClosed() throws Exception {
236239
}
237240
}
238241

242+
@Test
243+
public void shutdownTypeForwardedToFlag() throws InterruptedException {
244+
shutdown = spy(shutdown);
245+
doNothing().when(shutdown).flagChannelForClose(any(), any());
246+
createChannels(1);
247+
Channel channel = channels.iterator().next();
248+
for (ShutdownType type : ShutdownType.values()) {
249+
shutdown.gracefullyShutdownClientChannels(type);
250+
verify(shutdown).flagChannelForClose(channel, type);
251+
}
252+
253+
channels.close().await(5, TimeUnit.SECONDS);
254+
}
255+
239256
private void createChannels(int numChannels) throws InterruptedException {
240-
ChannelInitializer<LocalChannel> initializer = new ChannelInitializer<LocalChannel>() {
257+
ChannelInitializer<LocalChannel> initializer = new ChannelInitializer<>() {
241258
@Override
242259
protected void initChannel(LocalChannel ch) {}
243260
};

0 commit comments

Comments
 (0)