Skip to content

Commit c41faff

Browse files
authored
perf: Replace Unsafe ScheduledExecutorService Initialization with Exception-Handling Wrapper (#2414)
* perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357 * perf: pr-issue-2357
1 parent d91dbfc commit c41faff

File tree

8 files changed

+34
-22
lines changed

8 files changed

+34
-22
lines changed

Diff for: core/src/main/java/kafka/autobalancer/LoadRetriever.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.kafka.server.config.QuotaConfigs;
5353

5454
import com.automq.stream.utils.LogContext;
55+
import com.automq.stream.utils.Threads;
5556

5657
import java.time.Duration;
5758
import java.util.Collections;
@@ -65,7 +66,6 @@
6566
import java.util.Random;
6667
import java.util.Set;
6768
import java.util.concurrent.CompletableFuture;
68-
import java.util.concurrent.Executors;
6969
import java.util.concurrent.ScheduledExecutorService;
7070
import java.util.concurrent.TimeUnit;
7171
import java.util.concurrent.locks.Condition;
@@ -105,7 +105,9 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
105105
this.bootstrapServerMapInUse = new HashMap<>();
106106
this.lock = new ReentrantLock();
107107
this.cond = lock.newCondition();
108-
this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main"));
108+
this.mainExecutorService =
109+
Threads.newSingleThreadScheduledExecutor(
110+
new AutoBalancerThreadFactory("load-retriever-main"), logger);
109111
leaderEpochInitialized = false;
110112
staticConfig = new StaticAutoBalancerConfig(config.originals(), false);
111113
listenerName = staticConfig.getString(StaticAutoBalancerConfig.AUTO_BALANCER_CLIENT_LISTENER_NAME_CONFIG);

Diff for: core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.automq.stream.s3.operator.ObjectStorage.ReadOptions;
4040
import com.automq.stream.s3.streams.StreamMetadataListener;
4141
import com.automq.stream.utils.FutureUtil;
42+
import com.automq.stream.utils.Threads;
4243

4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -52,7 +53,6 @@
5253
import java.util.concurrent.CompletableFuture;
5354
import java.util.concurrent.ConcurrentHashMap;
5455
import java.util.concurrent.ExecutorService;
55-
import java.util.concurrent.Executors;
5656
import java.util.stream.Collectors;
5757

5858
import io.netty.buffer.ByteBuf;
@@ -77,7 +77,8 @@ public StreamMetadataManager(BrokerServer broker, int nodeId, ObjectReaderFactor
7777
this.pendingGetObjectsTasks = new LinkedList<>();
7878
this.objectReaderFactory = objectReaderFactory;
7979
this.indexCache = indexCache;
80-
this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor"));
80+
this.pendingExecutorService =
81+
Threads.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor"), LOGGER);
8182
broker.metadataLoader().installPublishers(List.of(this)).join();
8283
}
8384

Diff for: core/src/main/scala/kafka/log/stream/s3/network/ControllerRequestSender.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.kafka.server.ControllerRequestCompletionHandler;
2424
import org.apache.kafka.server.NodeToControllerChannelManager;
2525

26+
import com.automq.stream.utils.Threads;
27+
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

@@ -31,18 +33,18 @@
3133
import java.util.concurrent.BlockingQueue;
3234
import java.util.concurrent.CompletableFuture;
3335
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.Executors;
3536
import java.util.concurrent.LinkedBlockingQueue;
3637
import java.util.concurrent.ScheduledExecutorService;
3738
import java.util.concurrent.TimeUnit;
3839
import java.util.concurrent.atomic.AtomicBoolean;
3940
import java.util.function.Function;
4041

41-
import io.netty.util.concurrent.DefaultThreadFactory;
42-
4342
public class ControllerRequestSender {
43+
4444
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);
45+
4546
private static final long MAX_RETRY_DELAY_MS = 10 * 1000; // 10s
47+
4648
private final RetryPolicyContext retryPolicyContext;
4749

4850
private final NodeToControllerChannelManager channelManager;
@@ -55,7 +57,8 @@ public ControllerRequestSender(BrokerServer brokerServer, RetryPolicyContext ret
5557
this.retryPolicyContext = retryPolicyContext;
5658
this.channelManager = brokerServer.newNodeToControllerChannelManager("s3stream-to-controller", 60000);
5759
this.channelManager.start();
58-
this.retryService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("controller-request-retry-sender"));
60+
this.retryService =
61+
Threads.newSingleThreadScheduledExecutor("controller-request-retry-sender", false, LOGGER);
5962
this.requestAccumulatorMap = new ConcurrentHashMap<>();
6063
}
6164

Diff for: core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
package kafka.log.streamaspect
1313

1414
import com.automq.stream.api.Client
15-
import com.automq.stream.utils.FutureUtil
15+
import com.automq.stream.utils.{FutureUtil, Threads}
1616
import kafka.cluster.PartitionSnapshot
1717
import kafka.log._
1818
import kafka.log.streamaspect.ElasticUnifiedLog.{CheckpointExecutor, MaxCheckpointIntervalBytes, MinCheckpointIntervalMs}
@@ -21,7 +21,7 @@ import kafka.utils.Logging
2121
import org.apache.kafka.common.errors.OffsetOutOfRangeException
2222
import org.apache.kafka.common.errors.s3.StreamFencedException
2323
import org.apache.kafka.common.record.{MemoryRecords, RecordVersion}
24-
import org.apache.kafka.common.utils.{ThreadUtils, Time}
24+
import org.apache.kafka.common.utils.Time
2525
import org.apache.kafka.common.{TopicPartition, Uuid}
2626
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
2727
import org.apache.kafka.server.util.Scheduler
@@ -33,7 +33,7 @@ import java.nio.ByteBuffer
3333
import java.nio.file.Path
3434
import java.util
3535
import java.util.concurrent.atomic.LongAdder
36-
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList, Executors}
36+
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList}
3737
import scala.jdk.CollectionConverters.CollectionHasAsScala
3838
import scala.util.{Failure, Success, Try}
3939

@@ -291,8 +291,10 @@ class ElasticUnifiedLog(_logStartOffset: Long,
291291
}
292292

293293
object ElasticUnifiedLog extends Logging {
294-
private val CheckpointExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("checkpoint-executor", true))
295-
private val MaxCheckpointIntervalBytes = 50 * 1024 * 1024
294+
private val CheckpointExecutor = {
295+
Threads.newSingleThreadScheduledExecutor("checkpoint-executor", true, logger.underlying)
296+
}
297+
private val MaxCheckpointIntervalBytes = 50 * 1024 * 1024
296298
private val MinCheckpointIntervalMs = 10 * 1000
297299
private val Logs = new ConcurrentHashMap[TopicPartition, ElasticUnifiedLog]()
298300
// fuzzy dirty bytes for checkpoint, it's ok not thread safe

Diff for: s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.automq.stream.s3.operator.ObjectStorage;
2323
import com.automq.stream.s3.operator.ObjectStorage.ReadOptions;
2424
import com.automq.stream.utils.Systems;
25-
import com.automq.stream.utils.ThreadUtils;
25+
import com.automq.stream.utils.Threads;
2626

2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
@@ -40,7 +40,6 @@
4040
import java.util.Set;
4141
import java.util.concurrent.Callable;
4242
import java.util.concurrent.CompletableFuture;
43-
import java.util.concurrent.Executors;
4443
import java.util.concurrent.ScheduledExecutorService;
4544
import java.util.concurrent.TimeUnit;
4645
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,8 +61,8 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl
6261
private final ReadWriteLock lock = new ReentrantReadWriteLock();
6362
private final Lock readLock = lock.readLock();
6463
private final Lock writeLock = lock.writeLock();
65-
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
66-
ThreadUtils.createThreadFactory("upload-index", true));
64+
private final ScheduledExecutorService executorService =
65+
Threads.newSingleThreadScheduledExecutor("upload-index", true, LOGGER);
6766
private final Queue<CompletableFuture<Void>> uploadQueue = new LinkedList<>();
6867
private final CompletableFuture<Void> initCf = new CompletableFuture<>();
6968
private final AtomicBoolean pruned = new AtomicBoolean(false);

Diff for: s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
1616
import com.automq.stream.s3.metrics.stats.NetworkStats;
1717
import com.automq.stream.utils.LogContext;
18+
import com.automq.stream.utils.Threads;
1819

1920
import org.slf4j.Logger;
2021

@@ -56,7 +57,8 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
5657
this.availableTokens = this.tokenSize;
5758
this.maxTokens = maxTokens;
5859
this.queuedCallbacks = new PriorityQueue<>();
59-
this.refillThreadPool = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("refill-bucket-thread"));
60+
this.refillThreadPool =
61+
Threads.newSingleThreadScheduledExecutor(new DefaultThreadFactory("refill-bucket-thread"), LOGGER);
6062
this.callbackThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("callback-thread"));
6163
this.callbackThreadPool.execute(this::run);
6264
this.refillThreadPool.scheduleAtFixedRate(this::refillToken, refillIntervalMs, refillIntervalMs, TimeUnit.MILLISECONDS);

Diff for: s3stream/src/main/java/com/automq/stream/utils/threads/S3StreamThreadPoolMonitor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package com.automq.stream.utils.threads;
1313

1414
import com.automq.stream.utils.ThreadUtils;
15+
import com.automq.stream.utils.Threads;
1516

1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
@@ -20,7 +21,6 @@
2021
import java.util.Collections;
2122
import java.util.List;
2223
import java.util.concurrent.CopyOnWriteArrayList;
23-
import java.util.concurrent.Executors;
2424
import java.util.concurrent.LinkedBlockingQueue;
2525
import java.util.concurrent.ScheduledExecutorService;
2626
import java.util.concurrent.ThreadFactory;
@@ -30,9 +30,9 @@
3030

3131
public class S3StreamThreadPoolMonitor {
3232
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>();
33-
private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor(
34-
ThreadUtils.createThreadFactory("ThreadPoolMonitor-%d", true));
3533
private static Logger waterMarkLogger = LoggerFactory.getLogger(S3StreamThreadPoolMonitor.class);
34+
private static final ScheduledExecutorService MONITOR_SCHEDULED =
35+
Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("ThreadPoolMonitor-%d", true), waterMarkLogger);
3636
private static volatile long threadPoolStatusPeriodTime = TimeUnit.SECONDS.toMillis(3);
3737

3838
public static void config(Logger waterMarkLoggerConfig, long threadPoolStatusPeriodTimeConfig) {

Diff for: tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.common.utils.ThreadUtils;
2323
import org.apache.kafka.tools.automq.perf.TopicService.Topic;
2424

25+
import com.automq.stream.utils.Threads;
26+
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

@@ -56,7 +58,8 @@ public class ProducerService implements AutoCloseable {
5658
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);
5759

5860
private final List<Producer> producers = new LinkedList<>();
59-
private final ScheduledExecutorService adjustRateExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("perf-producer-rate-adjust", true));
61+
private final ScheduledExecutorService adjustRateExecutor =
62+
Threads.newSingleThreadScheduledExecutor("perf-producer-rate-adjust", true, LOGGER);
6063
private final ExecutorService executor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("perf-producer", false));
6164

6265
/**

0 commit comments

Comments
 (0)