Skip to content

Commit 9427640

Browse files
committed
wait all process stop
1 parent cdb0e3c commit 9427640

File tree

1 file changed

+12
-1
lines changed
  • elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor

1 file changed

+12
-1
lines changed

elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Queue;
3939
import java.util.concurrent.CancellationException;
4040
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.ExecutionException;
4243
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.Future;
@@ -140,14 +141,19 @@ private void execute(final JobConfiguration jobConfig, final ShardingContexts sh
140141
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
141142
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
142143
Queue<Future<Boolean>> futures = new LinkedList<>();
144+
CountDownLatch latch = new CountDownLatch(items.size());
143145
for (int each : items) {
144146
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
145147
ExecutorService executorService = executorContext.get(ExecutorService.class);
146148
if (executorService.isShutdown()) {
147149
return;
148150
}
149151
Future<Boolean> future = executorService.submit(() -> {
150-
process(jobConfig, shardingContexts, each, jobExecutionEvent);
152+
try {
153+
process(jobConfig, shardingContexts, each, jobExecutionEvent);
154+
} finally {
155+
latch.countDown();
156+
}
151157
return true;
152158
});
153159
futures.offer(future);
@@ -178,7 +184,12 @@ private void process(final JobConfiguration jobConfig, final ShardingContexts sh
178184
} finally {
179185
futures.poll();
180186
}
187+
}
181188

189+
try {
190+
latch.await();
191+
} catch (final InterruptedException ex) {
192+
Thread.currentThread().interrupt();
182193
}
183194
}
184195

0 commit comments

Comments
 (0)