Skip to content

Commit

Permalink
wait all process stop
Browse files Browse the repository at this point in the history
  • Loading branch information
skaic authored and linghengqian committed Sep 12, 2023
1 parent e0e2cba commit a69e297
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -141,14 +142,19 @@ private void execute(final JobConfiguration jobConfig, final ShardingContexts sh
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
Queue<Future<Boolean>> futures = new LinkedList<>();
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
Future<Boolean> future = executorService.submit(() -> {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
try {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
return true;
});
futures.offer(future);
Expand Down Expand Up @@ -179,7 +185,12 @@ private void process(final JobConfiguration jobConfig, final ShardingContexts sh
} finally {
futures.poll();
}
}

try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}

Expand Down

0 comments on commit a69e297

Please sign in to comment.