diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java index 2c518b598f..619350d61d 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/DefaultClientChannelManager.java @@ -41,6 +41,10 @@ import io.netty.channel.EventLoop; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -50,9 +54,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * User: michaels@netflix.com @@ -214,26 +215,23 @@ public boolean release(final PooledConnection conn) { boolean released = false; - // if the connection has been around too long (i.e. too many requests), then close it - // TODO(argha-c): Document what is a reasonable default here, and the class of origins that optimizes for - final boolean connExpiredLifetime = conn.getUsageCount() > connPoolConfig.getMaxRequestsPerConnection(); - if (conn.isShouldClose() || connExpiredLifetime) { + + if (conn.isShouldClose()) { // Close and discard the connection, as it has been flagged (possibly due to receiving a non-channel error // like a 503). conn.setInPool(false); conn.close(); - if (connExpiredLifetime) { - closeExpiredConnLifetimeCounter.increment(); - LOG.debug( - "[{}] closing conn lifetime expired, usage: {}", - conn.getChannel().id(), - conn.getUsageCount()); - } else { LOG.debug( "[{}] closing conn flagged to be closed", conn.getChannel().id()); - } - + } else if(isConnectionExpired(conn.getUsageCount())) { + conn.setInPool(false); + conn.close(); + closeExpiredConnLifetimeCounter.increment(); + LOG.debug( + "[{}] closing conn lifetime expired, usage: {}", + conn.getChannel().id(), + conn.getUsageCount()); } else if (connPoolConfig.isCloseOnCircuitBreakerEnabled() && discoveryResult.isCircuitBreakerTripped()) { LOG.debug( "[{}] closing conn, server circuit breaker tripped", @@ -270,6 +268,12 @@ public boolean release(final PooledConnection conn) { return released; } + protected boolean isConnectionExpired(long usageCount) { + // if the connection has been around too long (i.e. too many requests), then close it + // TODO(argha-c): Document what is a reasonable default here, and the class of origins that optimizes for + return usageCount > connPoolConfig.getMaxRequestsPerConnection(); + } + protected void updateServerStatsOnRelease(final PooledConnection conn) { final DiscoveryResult discoveryResult = conn.getServer(); discoveryResult.decrementActiveRequestsCount(); diff --git a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/PerServerConnectionPool.java b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/PerServerConnectionPool.java index 2ec9f198f7..8092ad9d95 100644 --- a/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/PerServerConnectionPool.java +++ b/zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/PerServerConnectionPool.java @@ -28,6 +28,10 @@ import io.netty.handler.codec.DecoderException; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -37,9 +41,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * User: michaels@netflix.com @@ -166,12 +167,7 @@ public Promise acquire( final PooledConnection conn = tryGettingFromConnectionPool(eventLoop); if (conn != null) { // There was a pooled connection available, so use this one. - conn.startRequestTimer(); - conn.incrementUsageCount(); - conn.getChannel().read(); - onAcquire(conn, passport); - initPooledConnection(conn, promise); - selectedHostAddr.set(getSelectedHostString(serverAddr)); + reusePooledConnection(passport, selectedHostAddr, conn, promise); } else { // connection pool empty, create new connection using client connection factory. tryMakingNewConnection(eventLoop, promise, passport, selectedHostAddr); @@ -180,6 +176,15 @@ public Promise acquire( return promise; } + protected void reusePooledConnection(CurrentPassport passport, AtomicReference selectedHostAddr, PooledConnection conn, Promise promise) { + conn.startRequestTimer(); + conn.incrementUsageCount(); + conn.getChannel().read(); + onAcquire(conn, passport); + initPooledConnection(conn, promise); + selectedHostAddr.set(getSelectedHostString(serverAddr)); + } + protected void updateServerStatsOnAcquire() { server.incrementActiveRequestsCount(); } @@ -232,21 +237,8 @@ protected void tryMakingNewConnection( Promise promise, CurrentPassport passport, AtomicReference selectedHostAddr) { - // Enforce MaxConnectionsPerHost config. - int maxConnectionsPerHost = config.maxConnectionsPerHost(); - int openAndOpeningConnectionCount = server.getOpenConnectionsCount() + connCreationsInProgress.get(); - if (maxConnectionsPerHost != -1 && openAndOpeningConnectionCount >= maxConnectionsPerHost) { - maxConnsPerHostExceededCounter.increment(); - promise.setFailure(new OriginConnectException( - "maxConnectionsPerHost=" + maxConnectionsPerHost + ", connectionsPerHost=" - + openAndOpeningConnectionCount, - OutboundErrorType.ORIGIN_SERVER_MAX_CONNS)); - LOG.warn( - "Unable to create new connection because at MaxConnectionsPerHost! maxConnectionsPerHost={}, connectionsPerHost={}, host={}origin={}", - maxConnectionsPerHost, - openAndOpeningConnectionCount, - server.getServerId(), - config.getOriginName()); + + if (!isWithinConnectionLimit(promise)) { return; } @@ -281,6 +273,27 @@ protected void tryMakingNewConnection( } } + protected boolean isWithinConnectionLimit(Promise promise) { + // Enforce MaxConnectionsPerHost config. + int maxConnectionsPerHost = config.maxConnectionsPerHost(); + int openAndOpeningConnectionCount = server.getOpenConnectionsCount() + connCreationsInProgress.get(); + if (maxConnectionsPerHost != -1 && openAndOpeningConnectionCount >= maxConnectionsPerHost) { + maxConnsPerHostExceededCounter.increment(); + promise.setFailure(new OriginConnectException( + "maxConnectionsPerHost=" + maxConnectionsPerHost + ", connectionsPerHost=" + + openAndOpeningConnectionCount, + OutboundErrorType.ORIGIN_SERVER_MAX_CONNS)); + LOG.warn( + "Unable to create new connection because at MaxConnectionsPerHost! maxConnectionsPerHost={}, connectionsPerHost={}, host={}origin={}", + maxConnectionsPerHost, + openAndOpeningConnectionCount, + server.getServerId(), + config.getOriginName()); + return false; + } + return true; + } + protected ChannelFuture connectToServer(EventLoop eventLoop, CurrentPassport passport, SocketAddress serverAddr) { return connectionFactory.connect(eventLoop, serverAddr, passport, this); } @@ -354,8 +367,7 @@ public boolean release(PooledConnection conn) { CurrentPassport passport = CurrentPassport.fromChannel(conn.getChannel()); // Discard conn if already at least above waterline in the pool already for this server. - int poolWaterline = config.perServerWaterline(); - if (poolWaterline > -1 && connections.size() >= poolWaterline) { + if (isOverPerServerWaterline(connections.size())) { closeAboveHighWaterMarkCounter.increment(); conn.close(); conn.setInPool(false); @@ -375,6 +387,11 @@ else if (connections.offer(conn)) { } } + protected boolean isOverPerServerWaterline(int connectionsInPool) { + int poolWaterline = config.perServerWaterline(); + return poolWaterline > -1 && connectionsInPool >= poolWaterline; + } + @Override public boolean remove(PooledConnection conn) { if (conn == null) { @@ -428,7 +445,7 @@ public int getConnsInUse() { } @Nullable - private static InetAddress getSelectedHostString(SocketAddress addr) { + protected InetAddress getSelectedHostString(SocketAddress addr) { if (addr instanceof InetSocketAddress) { return ((InetSocketAddress) addr).getAddress(); } else {