Skip to content

Commit e7fca27

Browse files
committed
Feat: Honor taskCount configs and ensure minimum threads are used to initialize
1 parent 0d1a939 commit e7fca27

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
162162

163163
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
164164
private static final int MAX_INITIALIZATION_RETRIES = 20;
165-
private static final int DEFAULT_CORE_POOL_SIZE = 1;
165+
private static final int MIN_CORE_POOL_SIZE = 2;
166+
private static final int TASKS_PER_THREAD_FACTOR = 4;
167+
private static final int KEEPALIVE_TIME_SECONDS = 2;
166168

167169
private static final EmittingLogger log = new EmittingLogger(SeekableStreamSupervisor.class);
168170

@@ -941,8 +943,19 @@ public SeekableStreamSupervisor(
941943
spec.isSuspended()
942944
);
943945

946+
int workerThreads;
944947
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
945948
log.info("Running Task autoscaler for supervisor[%s] for datasource[%s]", supervisorId, dataSource);
949+
workerThreads = (this.tuningConfig.getWorkerThreads() != null
950+
? this.tuningConfig.getWorkerThreads() / TASKS_PER_THREAD_FACTOR
951+
: autoScalerConfig.getTaskCountMax() / TASKS_PER_THREAD_FACTOR);
952+
} else {
953+
workerThreads = (this.tuningConfig.getWorkerThreads() != null
954+
? this.tuningConfig.getWorkerThreads() / TASKS_PER_THREAD_FACTOR
955+
: this.ioConfig.getTaskCount() / TASKS_PER_THREAD_FACTOR);
956+
}
957+
if (workerThreads < MIN_CORE_POOL_SIZE) {
958+
workerThreads = MIN_CORE_POOL_SIZE;
946959
}
947960

948961
IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -962,14 +975,16 @@ public SeekableStreamSupervisor(
962975
);
963976
}
964977

965-
this.workerExec = MoreExecutors.listeningDecorator(
966-
new ScheduledThreadPoolExecutor(
967-
DEFAULT_CORE_POOL_SIZE,
968-
Execs.makeThreadFactory(StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d")
969-
)
978+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
979+
workerThreads,
980+
Execs.makeThreadFactory(StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d")
970981
);
982+
executor.setKeepAliveTime(KEEPALIVE_TIME_SECONDS, TimeUnit.SECONDS);
971983

972-
log.info("Created worker pool for supervisor[%s] for dataSource[%s]", this.supervisorId, this.dataSource);
984+
this.workerExec = MoreExecutors.listeningDecorator(executor);
985+
log.info(
986+
"Created worker pool with [%d] threads for supervisor[%s] for dataSource[%s]",
987+
workerThreads, this.supervisorId, this.dataSource);
973988

974989
this.taskInfoProvider = new TaskInfoProvider()
975990
{

0 commit comments

Comments
 (0)