Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection cache 100 continue #9795

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -36,14 +40,23 @@
*/
class IdleInputStream extends InputStream {

private static final int TIMEOUT = 500;

private final Socket socket;
private final InputStream upstream;
private final LazyValue<ExecutorService> executor;
private final int idleTimeoutIterations;
private final int lastIterationRemainder;
private volatile int next = -1;
private volatile boolean closed = false;
private volatile boolean cancelled = false;
private Future<?> idlingThread;

IdleInputStream(InputStream upstream, String childSocketId, String socketId) {
IdleInputStream(Socket socket, InputStream upstream, String childSocketId, String socketId, Duration idleTimout) {
this.socket = socket;
this.upstream = upstream;
this.idleTimeoutIterations = (int) (idleTimout.toMillis() / TIMEOUT);
this.lastIterationRemainder = (int) (idleTimout.toMillis() % TIMEOUT);
executor = LazyValue.create(() -> Executors.newThreadPerTaskExecutor(
Thread.ofVirtual()
.name("helidon-socket-monitor-" + childSocketId + "-" + socketId, 0)
Expand Down Expand Up @@ -106,21 +119,66 @@ boolean isClosed() {
}

private void handle() {
//We needed to change this simple method to more complex solution.
try {
next = upstream.read();
if (next <= 0) {
closed = true;
//Currently configured socket read timeout. This is intended to be restored after this method finishes
int toRestore = socket.getSoTimeout();
int iterations = 1; // Current iteration number
int currentTimeout = TIMEOUT; //Timout to be used per iteration, can be different for the last iteration
boolean end = false; //Whether we reached enough iterations and therefore, we should timeout
while (!cancelled) {
try {
toRestore = timeoutToRestore(toRestore);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a method, having the if statement here makes it more readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I see this as a cleaner option :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this better, it's 3 lines of code, it's used only once and does not mutate any state.

socket.setSoTimeout(currentTimeout);
next = upstream.read();
if (next <= 0) {
closed = true;
}
break;
} catch (SocketTimeoutException e) {
if (end) {
//If "end" was already set to true, that means we had some remainder in idle timeout
//now we should really end
closed = true;
throw new UncheckedIOException(e);
}
//Determine whether we have tried to read enough times to end or not
end = iterations++ == idleTimeoutIterations;
if (end) {
//We can end now, but there still might be some remainder time to wait
if (lastIterationRemainder == 0) {
closed = true;
throw new UncheckedIOException(e);
} else {
currentTimeout = lastIterationRemainder;
}
}
}
}
if (!closed && socket.getSoTimeout() == currentTimeout) {
//Idle checking thread was canceled or detected it is not idle. Restore socket timeout it should have.
socket.setSoTimeout(toRestore);
}
} catch (IOException e) {
closed = true;
throw new UncheckedIOException(e);
}
}

private int timeoutToRestore(int toRestore) throws SocketException {
int currentSoTimeout = socket.getSoTimeout();
if (currentSoTimeout != TIMEOUT) {
toRestore = currentSoTimeout;
}
return toRestore;
}

private void endIdle() {
try {
cancelled = true;
idlingThread.get();
idlingThread = null;
cancelled = false;
} catch (InterruptedException | ExecutionException e) {
closed = true;
throw new RuntimeException("Exception in socket monitor thread.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Arrays;

import io.helidon.common.buffers.BufferData;
Expand All @@ -31,6 +32,7 @@
*/
public sealed class PlainSocket implements HelidonSocket permits TlsSocket {
private static final int BUFFER_LENGTH = 8 * 1024;
static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(30);

private final byte[] readBuffer = new byte[BUFFER_LENGTH];

Expand All @@ -46,13 +48,14 @@ public sealed class PlainSocket implements HelidonSocket permits TlsSocket {
* @param delegate delegate socket
* @param childSocketId channel id
* @param socketId server channel id
* @param idleTimeout idle timeout
*/
protected PlainSocket(Socket delegate, String childSocketId, String socketId) {
protected PlainSocket(Socket delegate, String childSocketId, String socketId, Duration idleTimeout) {
this.delegate = delegate;
this.childSocketId = childSocketId;
this.socketId = socketId;
try {
this.inputStream = new IdleInputStream(delegate.getInputStream(), childSocketId, socketId);
this.inputStream = new IdleInputStream(delegate, delegate.getInputStream(), childSocketId, socketId, idleTimeout);
this.outputStream = delegate.getOutputStream();
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -68,18 +71,30 @@ protected PlainSocket(Socket delegate, String childSocketId, String socketId) {
* @return a new plain socket
*/
public static PlainSocket server(Socket delegate, String channelId, String serverChannelId) {
return new PlainSocket(delegate, channelId, serverChannelId);
return new PlainSocket(delegate, channelId, serverChannelId, DEFAULT_IDLE_TIMEOUT);
}

/**
* Create a new client socket.
* Create a new client socket. Sets idle timeout to 30 seconds.
*
* @param delegate underlying socket
* @param channelId channel id
* @return a new plain socket
*/
public static PlainSocket client(Socket delegate, String channelId) {
return new PlainSocket(delegate, channelId, "client");
return new PlainSocket(delegate, channelId, "client", DEFAULT_IDLE_TIMEOUT);
}

/**
* Create a new client socket.
*
* @param delegate underlying socket
* @param channelId channel id
* @param idleTimeout idle timout
* @return a new plain socket
*/
public static PlainSocket client(Socket delegate, String channelId, Duration idleTimeout) {
return new PlainSocket(delegate, channelId, "client", idleTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.security.Principal;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;

Expand All @@ -33,8 +34,8 @@ public final class TlsSocket extends PlainSocket {
private volatile PeerInfo remotePeer;
private volatile byte[] lastSslSessionId;

private TlsSocket(SSLSocket socket, String channelId, String serverChannelId) {
super(socket, channelId, serverChannelId);
private TlsSocket(SSLSocket socket, String channelId, String serverChannelId, Duration idleTimeout) {
super(socket, channelId, serverChannelId, idleTimeout);

this.sslSocket = socket;
this.lastSslSessionId = socket.getSession().getId();
Expand All @@ -51,19 +52,31 @@ private TlsSocket(SSLSocket socket, String channelId, String serverChannelId) {
public static TlsSocket server(SSLSocket delegate,
String channelId,
String serverChannelId) {
return new TlsSocket(delegate, channelId, serverChannelId);
return new TlsSocket(delegate, channelId, serverChannelId, DEFAULT_IDLE_TIMEOUT);
}

/**
* Create a client TLS socket.
* Create a client TLS socket. Sets default idle timeout to 30 seconds.
*
* @param delegate underlying socket
* @param channelId channel id
* @return a new TLS socket
*/
public static TlsSocket client(SSLSocket delegate,
String channelId) {
return new TlsSocket(delegate, channelId, "client");
return new TlsSocket(delegate, channelId, "client", DEFAULT_IDLE_TIMEOUT);
}

/**
* Create a client TLS socket.
*
* @param delegate underlying socket
* @param channelId channel id
* @param idleTimeout idle timeout
* @return a new TLS socket
*/
public static TlsSocket client(SSLSocket delegate, String channelId, Duration idleTimeout) {
return new TlsSocket(delegate, channelId, "client", idleTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -245,6 +245,17 @@ default ClientRequestHeaders defaultRequestHeaders() {
@Option.Default("PT1S")
Duration readContinueTimeout();

/**
* Default connection idle time in the cache. Default is 30 seconds.
* This timeout is used when connection is put into the cache. It determines how long the connection can be idle
* before it is considered closed.
*
* @return idle connection timeout duration
*/
@Option.Configured
@Option.Default("PT30S")
Duration idleConnectionTimeout();

/**
* Whether to share connection cache between all the WebClient instances in JVM.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -106,6 +106,7 @@ public static TcpClientConnection create(WebClient webClient,
public TcpClientConnection connect() {
Tls tls = connectionKey.tls();
InetSocketAddress targetAddress = inetSocketAddress();
webClient.prototype().idleConnectionTimeout();

/*
Obtain target socket through proxy (if enabled), or connect to target socket
Expand Down Expand Up @@ -141,9 +142,9 @@ Obtain target socket through proxy (if enabled), or connect to target socket
if (LOGGER.isLoggable(TRACE)) {
debugTls(sslSocket);
}
this.helidonSocket = TlsSocket.client(sslSocket, channelId);
this.helidonSocket = TlsSocket.client(sslSocket, channelId, webClient.prototype().idleConnectionTimeout());
} else {
this.helidonSocket = PlainSocket.client(socket, channelId);
this.helidonSocket = PlainSocket.client(socket, channelId, webClient.prototype().idleConnectionTimeout());
}
this.reader = new DataReader(helidonSocket);
this.writer = new DirectDatatWriter(helidonSocket);
Expand Down
Loading