Skip to content

Commit e338ed4

Browse files
spericasbarchetta
authored andcommitted
Terminate monitor thread created by the timeout implementation when the supplier returns. Otherwise, with long timeouts, there can be multiple threads in TIMED_WAITING state just wasting memory. New tests for SE and MP. Issue helidon-io#10824. (helidon-io#10833)
1 parent 6647972 commit e338ed4

File tree

3 files changed

+185
-1
lines changed

3 files changed

+185
-1
lines changed

fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/TimeoutImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Future;
2122
import java.util.concurrent.TimeUnit;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.locks.ReentrantLock;
@@ -93,7 +94,7 @@ public <T> T invoke(Supplier<? extends T> supplier) {
9394
AtomicBoolean callReturned = new AtomicBoolean(false);
9495
AtomicBoolean interrupted = new AtomicBoolean(false);
9596

96-
executor.submit(FaultTolerance.toDelayedRunnable(() -> {
97+
Future<?> monitor = executor.submit(FaultTolerance.toDelayedRunnable(() -> {
9798
interruptLock.lock();
9899
try {
99100
if (callReturned.compareAndSet(false, true)) {
@@ -120,6 +121,11 @@ public <T> T invoke(Supplier<? extends T> supplier) {
120121

121122
interruptLock.lock();
122123
try {
124+
boolean cancelMonitor = monitor.cancel(true);
125+
if (!cancelMonitor) {
126+
LOGGER.log(System.Logger.Level.DEBUG, "Unable to cancel monitor thread");
127+
}
128+
123129
callReturned.set(true);
124130
// Run invocation in current thread
125131
// Clear interrupted flag here -- required for uninterruptible busy loops
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.faulttolerance;
18+
19+
import java.time.Duration;
20+
import java.util.List;
21+
import java.util.concurrent.CopyOnWriteArrayList;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ThreadFactory;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.hamcrest.CoreMatchers.is;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.hamcrest.Matchers.not;
31+
32+
/**
33+
* Check that all threads created by timeout instances are properly
34+
* terminated and not in {@link Thread.State#TIMED_WAITING} state.
35+
*/
36+
class TimeoutThreadTest {
37+
38+
@Test
39+
void testTimeout() {
40+
// create executor with our thread factory
41+
TestThreadFactory threadFactory = new TestThreadFactory();
42+
ExecutorService executor = Executors.newThreadPerTaskExecutor(threadFactory);
43+
44+
// run 10 tasks with long timeouts that return immediately
45+
for (int i = 0; i < 10; i++) {
46+
String status = Timeout.builder()
47+
.timeout(Duration.ofSeconds(60)) // long timeout
48+
.currentThread(true)
49+
.executor(executor)
50+
.build()
51+
.invoke(() -> "done");
52+
assertThat(status, is("done"));
53+
}
54+
55+
// if timeout monitor threads stopped, no TIMED_WAITING states
56+
List<Thread> threads = threadFactory.threads();
57+
assertThat(threads.size(), is(10));
58+
for (Thread thread : threads) {
59+
assertThat(thread.getState(), is(not(Thread.State.TIMED_WAITING)));
60+
}
61+
}
62+
63+
static class TestThreadFactory implements ThreadFactory {
64+
65+
private final ThreadFactory delegate = Thread.ofVirtual().factory();
66+
private final List<Thread> threads = new CopyOnWriteArrayList<>();
67+
68+
@Override
69+
public Thread newThread(Runnable r) {
70+
Thread t = delegate.newThread(r);
71+
threads.add(t);
72+
return t;
73+
}
74+
75+
List<Thread> threads() {
76+
return threads;
77+
}
78+
}
79+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.helidon.microprofile.faulttolerance;
17+
18+
import java.time.temporal.ChronoUnit;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ThreadFactory;
24+
25+
import io.helidon.faulttolerance.FaultTolerance;
26+
import io.helidon.microprofile.testing.AddBean;
27+
import io.helidon.microprofile.testing.junit5.HelidonTest;
28+
29+
import jakarta.inject.Inject;
30+
import org.eclipse.microprofile.faulttolerance.Timeout;
31+
import org.junit.jupiter.api.AfterAll;
32+
import org.junit.jupiter.api.BeforeAll;
33+
import org.junit.jupiter.api.Test;
34+
35+
import static org.hamcrest.CoreMatchers.is;
36+
import static org.hamcrest.MatcherAssert.assertThat;
37+
import static org.hamcrest.Matchers.not;
38+
39+
/**
40+
* Check that all threads created by timeout instances are properly
41+
* terminated and not in {@link Thread.State#TIMED_WAITING} state.
42+
*/
43+
@HelidonTest
44+
@AddBean(TimeoutThreadsTest.MyBean.class)
45+
class TimeoutThreadsTest {
46+
47+
private final static TestThreadFactory THREAD_FACTORY = new TestThreadFactory();
48+
49+
@Inject
50+
private MyBean bean;
51+
52+
@BeforeAll
53+
static void init() {
54+
ExecutorService executor = Executors.newThreadPerTaskExecutor(THREAD_FACTORY);
55+
FaultTolerance.executor(() -> executor);
56+
}
57+
58+
@Test
59+
void testTimeout() {
60+
for (int i = 0; i < 10; i++) {
61+
String status = bean.timeout();
62+
assertThat(status, is("done"));
63+
}
64+
}
65+
66+
@AfterAll
67+
static void close() {
68+
List<Thread> threads = THREAD_FACTORY.threads();
69+
assertThat(threads.size(), is(10));
70+
for (Thread thread : threads) {
71+
assertThat(thread.getState(), is(not(Thread.State.TIMED_WAITING)));
72+
}
73+
}
74+
75+
static class MyBean {
76+
77+
@Timeout(value = 60, unit = ChronoUnit.SECONDS) // long timeout
78+
public String timeout() {
79+
return "done";
80+
}
81+
}
82+
83+
static class TestThreadFactory implements ThreadFactory {
84+
85+
private final ThreadFactory delegate = Thread.ofVirtual().factory();
86+
private final List<Thread> threads = new CopyOnWriteArrayList<>();
87+
88+
@Override
89+
public Thread newThread(Runnable r) {
90+
Thread t = delegate.newThread(r);
91+
threads.add(t);
92+
return t;
93+
}
94+
95+
List<Thread> threads() {
96+
return threads;
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)