Skip to content

Commit

Permalink
Extract methods for easier reuse/overriding (#1788)
Browse files Browse the repository at this point in the history
* Extract methods for easier reuse/overriding

* PR feedback
  • Loading branch information
jguerra authored Jul 17, 2024
1 parent 8e90acd commit d6b189a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import io.netty.channel.EventLoop;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -50,9 +54,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* User: [email protected]
Expand Down Expand Up @@ -214,26 +215,23 @@ public boolean release(final PooledConnection conn) {

boolean released = false;

// if the connection has been around too long (i.e. too many requests), then close it
// TODO(argha-c): Document what is a reasonable default here, and the class of origins that optimizes for
final boolean connExpiredLifetime = conn.getUsageCount() > connPoolConfig.getMaxRequestsPerConnection();
if (conn.isShouldClose() || connExpiredLifetime) {

if (conn.isShouldClose()) {
// Close and discard the connection, as it has been flagged (possibly due to receiving a non-channel error
// like a 503).
conn.setInPool(false);
conn.close();
if (connExpiredLifetime) {
closeExpiredConnLifetimeCounter.increment();
LOG.debug(
"[{}] closing conn lifetime expired, usage: {}",
conn.getChannel().id(),
conn.getUsageCount());
} else {
LOG.debug(
"[{}] closing conn flagged to be closed",
conn.getChannel().id());
}

} else if(isConnectionExpired(conn.getUsageCount())) {
conn.setInPool(false);
conn.close();
closeExpiredConnLifetimeCounter.increment();
LOG.debug(
"[{}] closing conn lifetime expired, usage: {}",
conn.getChannel().id(),
conn.getUsageCount());
} else if (connPoolConfig.isCloseOnCircuitBreakerEnabled() && discoveryResult.isCircuitBreakerTripped()) {
LOG.debug(
"[{}] closing conn, server circuit breaker tripped",
Expand Down Expand Up @@ -270,6 +268,12 @@ public boolean release(final PooledConnection conn) {
return released;
}

protected boolean isConnectionExpired(long usageCount) {
// if the connection has been around too long (i.e. too many requests), then close it
// TODO(argha-c): Document what is a reasonable default here, and the class of origins that optimizes for
return usageCount > connPoolConfig.getMaxRequestsPerConnection();
}

protected void updateServerStatsOnRelease(final PooledConnection conn) {
final DiscoveryResult discoveryResult = conn.getServer();
discoveryResult.decrementActiveRequestsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import io.netty.handler.codec.DecoderException;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -37,9 +41,6 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* User: [email protected]
Expand Down Expand Up @@ -166,12 +167,7 @@ public Promise<PooledConnection> acquire(
final PooledConnection conn = tryGettingFromConnectionPool(eventLoop);
if (conn != null) {
// There was a pooled connection available, so use this one.
conn.startRequestTimer();
conn.incrementUsageCount();
conn.getChannel().read();
onAcquire(conn, passport);
initPooledConnection(conn, promise);
selectedHostAddr.set(getSelectedHostString(serverAddr));
reusePooledConnection(passport, selectedHostAddr, conn, promise);
} else {
// connection pool empty, create new connection using client connection factory.
tryMakingNewConnection(eventLoop, promise, passport, selectedHostAddr);
Expand All @@ -180,6 +176,15 @@ public Promise<PooledConnection> acquire(
return promise;
}

protected void reusePooledConnection(CurrentPassport passport, AtomicReference<? super InetAddress> selectedHostAddr, PooledConnection conn, Promise<PooledConnection> promise) {
conn.startRequestTimer();
conn.incrementUsageCount();
conn.getChannel().read();
onAcquire(conn, passport);
initPooledConnection(conn, promise);
selectedHostAddr.set(getSelectedHostString(serverAddr));
}

protected void updateServerStatsOnAcquire() {
server.incrementActiveRequestsCount();
}
Expand Down Expand Up @@ -232,21 +237,8 @@ protected void tryMakingNewConnection(
Promise<PooledConnection> promise,
CurrentPassport passport,
AtomicReference<? super InetAddress> selectedHostAddr) {
// Enforce MaxConnectionsPerHost config.
int maxConnectionsPerHost = config.maxConnectionsPerHost();
int openAndOpeningConnectionCount = server.getOpenConnectionsCount() + connCreationsInProgress.get();
if (maxConnectionsPerHost != -1 && openAndOpeningConnectionCount >= maxConnectionsPerHost) {
maxConnsPerHostExceededCounter.increment();
promise.setFailure(new OriginConnectException(
"maxConnectionsPerHost=" + maxConnectionsPerHost + ", connectionsPerHost="
+ openAndOpeningConnectionCount,
OutboundErrorType.ORIGIN_SERVER_MAX_CONNS));
LOG.warn(
"Unable to create new connection because at MaxConnectionsPerHost! maxConnectionsPerHost={}, connectionsPerHost={}, host={}origin={}",
maxConnectionsPerHost,
openAndOpeningConnectionCount,
server.getServerId(),
config.getOriginName());

if (!isWithinConnectionLimit(promise)) {
return;
}

Expand Down Expand Up @@ -281,6 +273,27 @@ protected void tryMakingNewConnection(
}
}

protected boolean isWithinConnectionLimit(Promise<PooledConnection> promise) {
// Enforce MaxConnectionsPerHost config.
int maxConnectionsPerHost = config.maxConnectionsPerHost();
int openAndOpeningConnectionCount = server.getOpenConnectionsCount() + connCreationsInProgress.get();
if (maxConnectionsPerHost != -1 && openAndOpeningConnectionCount >= maxConnectionsPerHost) {
maxConnsPerHostExceededCounter.increment();
promise.setFailure(new OriginConnectException(
"maxConnectionsPerHost=" + maxConnectionsPerHost + ", connectionsPerHost="
+ openAndOpeningConnectionCount,
OutboundErrorType.ORIGIN_SERVER_MAX_CONNS));
LOG.warn(
"Unable to create new connection because at MaxConnectionsPerHost! maxConnectionsPerHost={}, connectionsPerHost={}, host={}origin={}",
maxConnectionsPerHost,
openAndOpeningConnectionCount,
server.getServerId(),
config.getOriginName());
return false;
}
return true;
}

protected ChannelFuture connectToServer(EventLoop eventLoop, CurrentPassport passport, SocketAddress serverAddr) {
return connectionFactory.connect(eventLoop, serverAddr, passport, this);
}
Expand Down Expand Up @@ -354,8 +367,7 @@ public boolean release(PooledConnection conn) {
CurrentPassport passport = CurrentPassport.fromChannel(conn.getChannel());

// Discard conn if already at least above waterline in the pool already for this server.
int poolWaterline = config.perServerWaterline();
if (poolWaterline > -1 && connections.size() >= poolWaterline) {
if (isOverPerServerWaterline(connections.size())) {
closeAboveHighWaterMarkCounter.increment();
conn.close();
conn.setInPool(false);
Expand All @@ -375,6 +387,11 @@ else if (connections.offer(conn)) {
}
}

protected boolean isOverPerServerWaterline(int connectionsInPool) {
int poolWaterline = config.perServerWaterline();
return poolWaterline > -1 && connectionsInPool >= poolWaterline;
}

@Override
public boolean remove(PooledConnection conn) {
if (conn == null) {
Expand Down Expand Up @@ -428,7 +445,7 @@ public int getConnsInUse() {
}

@Nullable
private static InetAddress getSelectedHostString(SocketAddress addr) {
protected InetAddress getSelectedHostString(SocketAddress addr) {
if (addr instanceof InetSocketAddress) {
return ((InetSocketAddress) addr).getAddress();
} else {
Expand Down

0 comments on commit d6b189a

Please sign in to comment.