Skip to content

Race condition in IdleInputStream when reusing WebClient connections under high load #10861

@hozumi

Description

@hozumi

Environment Details

  • Helidon Version: 4.3.1
  • Helidon SE
  • JDK version: 25 (temurin)
  • OS: MacOS 26.1, Rocky8

Problem Description

I am observing an issue where the response time of our HTTP server (which acts as an aggregator using Helidon WebServer & WebClient to fetch data from multiple external HTTP APIs) gradually degrades over several hours to a day. The system handles hundreds of HTTP requests per second. Restarting the application resolves the latency issue temporarily.

While investigating the root cause of this performance degradation, I discovered a potential race condition in IdleInputStream related to the WebClient connection pool. Although I am not yet 100% certain this is the sole cause of the latency accumulation, it is a definite issue where the idle socket monitoring thread may conflict with active connection usage.

Technical Details

  1. When a connection is returned to the pool, idle() is called in Http1ConnectionCache:

    if (connectionQueue.offer(conn, QUEUE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
    conn.helidonSocket().idle(); // mark it as idle to stay blocked at read for closed conn detection

  2. Inside idle(), the idlingThread is assigned:

    void idle() {
    if (idlingThread != null) {
    return;
    }
    idlingThread = executor.get().submit(this::handle);
    }

  3. However, if the connection is immediately retrieved from the pool for a new request (which happens frequently under high load), the check for idlingThread inside read():

    public int read() throws IOException {
    if (idlingThread != null) {
    endIdle();
    }

    may race with the assignment or execution of the idle monitor.

Specifically, it appears possible for the idle monitoring thread to run or continue running even after the connection has been leased again and should be considered "active".

Steps to reproduce

I have created a reproduction case that patches IdleInputStream.java to add debug logging (active flag) and runs a load test.

  • diff io/helidon/common/socket/IdleInputStream.java
diff --git a/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java b/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java
index b62420e261..d77dbe1c2a 100644
--- a/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java
+++ b/common/socket/src/main/java/io/helidon/common/socket/IdleInputStream.java
@@ -52,6 +52,7 @@ class IdleInputStream extends InputStream {
     private volatile int next = -1;
     private volatile boolean closed = false;
     private volatile boolean cancelled = false;
+    private volatile boolean active = true;
     private Future<?> idlingThread;

     IdleInputStream(Socket socket, InputStream upstream, String childSocketId, String socketId) {
@@ -69,6 +70,7 @@ class IdleInputStream extends InputStream {
         if (idlingThread != null) {
             endIdle();
         }
+        active = true;
         if (next < 0) {
             return upstream.read();
         } else {
@@ -83,6 +85,7 @@ class IdleInputStream extends InputStream {
         if (idlingThread != null) {
             endIdle();
         }
+        active = true;
         if (next < 0) {
             return upstream.read(b, off, len);
         } else {
@@ -108,6 +111,7 @@ class IdleInputStream extends InputStream {
      * in blocking manner to detect severed connection.
      */
     void idle() {
+        active = false;
         if (idlingThread != null) {
             return;
         }
@@ -124,6 +128,9 @@ class IdleInputStream extends InputStream {
             int toRestore = socket.getSoTimeout();
             int idleTimeoutIterations = Math.ceilDiv(Math.max(1, toRestore), ITERATION_TIME_MILLIS);
             for (int i = 0; !cancelled; i++) {
+                if (active) {
+                    System.out.println("active=true at IdleInputStream.handle() socket: " + socket);
+                }
                 try {
                     //We need to check the current socket timeout before each iteration.
                     //This time out could have changed,
@@ -168,6 +175,7 @@ class IdleInputStream extends InputStream {
             idlingThread.get();
             idlingThread = null;
             cancelled = false;
+            active = true;
         } catch (InterruptedException | ExecutionException e) {
             closed = true;
             throw new RuntimeException("Exception in socket monitor thread.", e);
  • src/example/Main.java
package example;

import io.helidon.http.HeaderNames;
import io.helidon.logging.common.LogConfig;
import io.helidon.webclient.api.HttpClientRequest;
import io.helidon.webclient.api.HttpClientResponse;
import io.helidon.webclient.api.WebClient;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.http.HttpRouting;
import io.helidon.webserver.http.ServerRequest;
import io.helidon.webserver.http.ServerResponse;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.StructuredTaskScope;

public class Main {

    static WebClient client = WebClient
            .builder()
            .followRedirects(false)
            .keepAlive(true)
            .shareConnectionCache(true)
            .connectionCacheSize(100)
            .connectTimeout(Duration.ofMillis(1000))
            .readTimeout(Duration.ofMillis(1000))
            .build();

    public static void main(String[] args) {
        LogConfig.configureRuntime();

        WebServer server = WebServer.builder()
                .host("0.0.0.0")
                .port(8080)
                .routing(Main::routing)
                .build()
                .start();

        System.out.println("WEB server is up!");

        for (int i = 0; i <= 100000; i++) {
            try (var scope = StructuredTaskScope.open(
                    StructuredTaskScope.Joiner.<String>awaitAll(),
                    cf -> cf.withTimeout(Duration.ofMillis(1000)))) {
                for (int j = 0; j <= 100; j++) {
                    final int i2 = i;
                    final int j2 = j;
                    StructuredTaskScope.Subtask<String> t = scope.fork(() -> {
                        multiShot(i2, j2);
                    });
                }
                try {
                    scope.join();
                } catch (StructuredTaskScope.TimeoutException e) {
                    scope.close();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    static void multiShot(int i, int j) {
        try (var scope = StructuredTaskScope.open(
                StructuredTaskScope.Joiner.<String>awaitAll(),
                cf -> cf.withTimeout(Duration.ofMillis(200)))) {
            ArrayList<StructuredTaskScope.Subtask<String>> arr = new ArrayList<>();
            long start = System.currentTimeMillis();
            for (int k = 0; k < 10; k++) {
                StructuredTaskScope.Subtask<String> t = scope.fork(() -> {
                    HttpClientRequest req = client.get();
                    req.uri("http://localhost:8080/hello");
                    try (HttpClientResponse res = req.request()) {
                        String s = res.as(String.class);
                        return s;
                    }
                });
                arr.add(t);
            }
            try {
                scope.join();
            } catch (StructuredTaskScope.TimeoutException e) {
                scope.close();
            } catch (InterruptedException e) {

            }
            long numOfSuccess = arr.stream()
                    .filter((t) -> t.state() == StructuredTaskScope.Subtask.State.SUCCESS && t.get().equals("hello"))
                    .count();
            long end = System.currentTimeMillis();
            long duration = (end - start);
            if (i % 100 == 0 && j == 0) {
                System.out.println("i: " + i + ", success: " + numOfSuccess + "/" + 10 + " duration: " + duration + "ms");
            }
        }
    }

    static void routing(HttpRouting.Builder routing) {
        routing.get("/hello", Main::helloResponse);
    }

    static void helloResponse(ServerRequest request, ServerResponse response) {
        try {
            Thread.sleep(Duration.ofMillis(10));
        } catch (InterruptedException e) {
        }
        response.header(HeaderNames.CONTENT_TYPE, "text/plain")
                .send("hello");;
    }
}
  • Output
WEB server is up!
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53075]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53067]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53079]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53046]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53069]
i: 0, success: 10/10 duration: 193ms
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53799]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=54812]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=55051]
...omitted...
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53710]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53828]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=53824]
i: 100, success: 0/10 duration: 207ms
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=57233]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=58915]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=58914]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=62788]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=63604]
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=63605]
i: 200, success: 10/10 duration: 30ms
active=true at IdleInputStream.handle() socket: Socket[addr=localhost/127.0.0.1,port=8080,localport=50145]

The log output confirms that IdleInputStream.handle() (the monitor thread) is running and sees active=true , meaning the connection is simultaneously being used by the application logic.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions