Skip to content

Commit 57cf513

Browse files
authored
FIx PersistentTTLNode thread leak (#1264)
Fixes #1263.
1 parent bac8ba9 commit 57cf513

File tree

4 files changed

+260
-28
lines changed

4 files changed

+260
-28
lines changed

curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,28 @@ public Future<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long d
9595
scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delay, unit);
9696
return new InternalScheduledFutureTask(scheduledFuture);
9797
}
98+
99+
/**
100+
* Creates and executes a periodic action that becomes enabled first after the given initial
101+
* delay, and subsequently with the given period; that is executions will commence after {@code
102+
* initialDelay} then {@code initialDelay+period}, then {@code initialDelay + 2 * period}, and so
103+
* on. If any execution of the task encounters an exception, subsequent executions are suppressed.
104+
* Otherwise, the task will only terminate via cancellation or termination of the executor. If any
105+
* execution of this task takes longer than its period, then subsequent executions may start late,
106+
* but will not concurrently execute.
107+
*
108+
* @param task the task to execute
109+
* @param initialDelay the time to delay first execution
110+
* @param period the period between successive executions
111+
* @param unit the time unit of the initialDelay and delay parameters
112+
* @return a Future representing pending completion of the task, and whose <tt>get()</tt> method
113+
* will throw an exception upon cancellation
114+
*/
115+
public Future<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
116+
Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
117+
118+
ScheduledFuture<?> scheduledFuture =
119+
scheduledExecutorService.scheduleAtFixedRate(task, initialDelay, period, unit);
120+
return new InternalScheduledFutureTask(scheduledFuture);
121+
}
98122
}

curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
package org.apache.curator.utils;
2121

2222
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
2325
import static org.junit.jupiter.api.Assertions.assertTrue;
2426
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.Executors;
28+
import java.util.concurrent.RejectedExecutionException;
2629
import java.util.concurrent.ScheduledExecutorService;
2730
import java.util.concurrent.TimeUnit;
2831
import java.util.concurrent.atomic.AtomicInteger;
@@ -110,4 +113,84 @@ public void run() {
110113
assertTrue(newValue > value);
111114
assertEquals(innerValue, innerCounter.get());
112115
}
116+
117+
@Test
118+
public void testCloseableScheduleAtFixedRate() throws InterruptedException {
119+
CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService);
120+
121+
final CountDownLatch latch = new CountDownLatch(QTY);
122+
final AtomicInteger fixedDelayCounter = new AtomicInteger();
123+
service.scheduleAtFixedRate(
124+
new Runnable() {
125+
@Override
126+
public void run() {
127+
latch.countDown();
128+
// This delay is almost NOT impacting when using scheduleAtFixedRate
129+
try {
130+
Thread.sleep(DELAY_MS);
131+
} catch (InterruptedException e) {
132+
// Do nothing
133+
}
134+
}
135+
},
136+
DELAY_MS,
137+
DELAY_MS,
138+
TimeUnit.MILLISECONDS);
139+
service.scheduleWithFixedDelay(
140+
new Runnable() {
141+
@Override
142+
public void run() {
143+
fixedDelayCounter.incrementAndGet();
144+
// This delay is impacting when using scheduleWithFixedDelay
145+
try {
146+
Thread.sleep(DELAY_MS);
147+
} catch (InterruptedException e) {
148+
// Do nothing
149+
}
150+
}
151+
},
152+
DELAY_MS,
153+
DELAY_MS,
154+
TimeUnit.MILLISECONDS);
155+
assertTrue(latch.await((QTY * 2) * DELAY_MS, TimeUnit.MILLISECONDS));
156+
assertTrue(fixedDelayCounter.get() <= (QTY / 2 + 1));
157+
}
158+
159+
@Test
160+
public void testCloseWithoutShutdown() throws InterruptedException {
161+
final CountDownLatch latch = new CountDownLatch(1);
162+
try (CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService)) {
163+
service.submit(latch::countDown);
164+
assertTrue(latch.await(1, TimeUnit.SECONDS));
165+
}
166+
assertFalse(executorService.isShutdown());
167+
}
168+
169+
@Test
170+
public void testCloseWithShutdown() throws InterruptedException {
171+
final CountDownLatch latch = new CountDownLatch(1);
172+
try (CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService, true)) {
173+
service.submit(latch::countDown);
174+
assertTrue(latch.await(1, TimeUnit.SECONDS));
175+
}
176+
assertTrue(executorService.isShutdown());
177+
assertThrows(RejectedExecutionException.class, () -> executorService.submit(() -> System.out.println("Hello")));
178+
}
179+
180+
@Test
181+
public void testCloseMultipleTimes() throws InterruptedException {
182+
final CountDownLatch latch = new CountDownLatch(1);
183+
CloseableScheduledExecutorService service = null;
184+
try {
185+
service = new CloseableScheduledExecutorService(executorService, true);
186+
service.submit(latch::countDown);
187+
assertTrue(latch.await(1, TimeUnit.SECONDS));
188+
} finally {
189+
if (service != null) {
190+
service.close();
191+
service.close();
192+
service.close();
193+
}
194+
}
195+
}
113196
}

curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@
2424
import java.io.IOException;
2525
import java.util.Objects;
2626
import java.util.concurrent.Executors;
27-
import java.util.concurrent.Future;
2827
import java.util.concurrent.ScheduledExecutorService;
2928
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicReference;
3129
import org.apache.curator.framework.CuratorFramework;
30+
import org.apache.curator.utils.CloseableScheduledExecutorService;
3231
import org.apache.curator.utils.ThreadUtils;
3332
import org.apache.curator.utils.ZKPaths;
3433
import org.apache.zookeeper.CreateMode;
@@ -61,13 +60,15 @@ public class PersistentTtlNode implements Closeable {
6160
public static final int DEFAULT_TOUCH_SCHEDULE_FACTOR = 2;
6261
public static final boolean DEFAULT_USE_PARENT_CREATION = true;
6362

63+
@VisibleForTesting
64+
static final String TOUCH_THREAD_NAME = "PersistentTtlNode";
65+
6466
private final Logger log = LoggerFactory.getLogger(getClass());
6567
private final PersistentNode node;
6668
private final CuratorFramework client;
6769
private final long ttlMs;
6870
private final int touchScheduleFactor;
69-
private final ScheduledExecutorService executorService;
70-
private final AtomicReference<Future<?>> futureRef = new AtomicReference<>();
71+
private final CloseableScheduledExecutorService closeableExecutorService;
7172
private final String childPath;
7273

7374
/**
@@ -79,7 +80,9 @@ public class PersistentTtlNode implements Closeable {
7980
public PersistentTtlNode(CuratorFramework client, String path, long ttlMs, byte[] initData) {
8081
this(
8182
client,
82-
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
83+
new CloseableScheduledExecutorService(
84+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
85+
true),
8386
path,
8487
ttlMs,
8588
initData,
@@ -99,7 +102,9 @@ public PersistentTtlNode(
99102
CuratorFramework client, String path, long ttlMs, byte[] initData, boolean useParentCreation) {
100103
this(
101104
client,
102-
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
105+
new CloseableScheduledExecutorService(
106+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
107+
true),
103108
path,
104109
ttlMs,
105110
initData,
@@ -128,7 +133,7 @@ public PersistentTtlNode(
128133
int touchScheduleFactor) {
129134
this(
130135
client,
131-
executorService,
136+
new CloseableScheduledExecutorService(executorService),
132137
path,
133138
ttlMs,
134139
initData,
@@ -157,6 +162,26 @@ public PersistentTtlNode(
157162
String childNodeName,
158163
int touchScheduleFactor,
159164
boolean useParentCreation) {
165+
this(
166+
client,
167+
new CloseableScheduledExecutorService(executorService, false),
168+
path,
169+
ttlMs,
170+
initData,
171+
childNodeName,
172+
touchScheduleFactor,
173+
useParentCreation);
174+
}
175+
176+
private PersistentTtlNode(
177+
CuratorFramework client,
178+
CloseableScheduledExecutorService closeableExecutorService,
179+
String path,
180+
long ttlMs,
181+
byte[] initData,
182+
String childNodeName,
183+
int touchScheduleFactor,
184+
boolean useParentCreation) {
160185
this.client = Objects.requireNonNull(client, "client cannot be null");
161186
this.ttlMs = ttlMs;
162187
this.touchScheduleFactor = touchScheduleFactor;
@@ -168,7 +193,7 @@ protected void deleteNode() {
168193
// NOP
169194
}
170195
};
171-
this.executorService = Objects.requireNonNull(executorService, "executorService cannot be null");
196+
this.closeableExecutorService = closeableExecutorService;
172197
childPath = ZKPaths.makePath(Objects.requireNonNull(path, "path cannot be null"), childNodeName);
173198
}
174199

@@ -198,9 +223,8 @@ void touch() {
198223
*/
199224
public void start() {
200225
node.start();
201-
Future<?> future = executorService.scheduleAtFixedRate(
226+
closeableExecutorService.scheduleAtFixedRate(
202227
this::touch, ttlMs / touchScheduleFactor, ttlMs / touchScheduleFactor, TimeUnit.MILLISECONDS);
203-
futureRef.set(future);
204228
}
205229

206230
/**
@@ -238,17 +262,19 @@ public byte[] getData() {
238262
return node.getData();
239263
}
240264

265+
@VisibleForTesting
266+
CloseableScheduledExecutorService getCloseableScheduledExecutorService() {
267+
return closeableExecutorService;
268+
}
269+
241270
/**
242271
* Call when you are done with the PersistentTtlNode. Note: the ZNode is <em>not</em> immediately
243272
* deleted. However, if no other PersistentTtlNode with the same path is running the node will get deleted
244273
* based on the ttl.
245274
*/
246275
@Override
247276
public void close() {
248-
Future<?> future = futureRef.getAndSet(null);
249-
if (future != null) {
250-
future.cancel(true);
251-
}
277+
closeableExecutorService.close();
252278
try {
253279
node.close();
254280
} catch (IOException e) {

0 commit comments

Comments
 (0)