From 762804f30b6a45c87d7288a367c27179283d4411 Mon Sep 17 00:00:00 2001 From: Ponfee Date: Thu, 9 Nov 2023 22:24:57 +0800 Subject: [PATCH] refactor savepoint --- .../core/base/SupervisorCoreRpcService.java | 5 +- .../ponfee/disjob/core/handle/Savepoint.java | 7 ++- .../cn/ponfee/disjob/core/model/SchedJob.java | 7 +++ .../database/DatabaseServerRegistry.java | 8 +-- .../provider/SupervisorCoreRpcProvider.java | 2 +- .../service/AbstractJobManager.java | 21 +++++--- .../service/DistributedJobManager.java | 8 ++- .../thread/TriggeringJobScanner.java | 3 +- .../disjob/test/handler/AJobHandler.java | 2 +- .../disjob/test/handler/BJobHandler.java | 2 +- .../disjob/test/handler/CJobHandler.java | 2 +- .../disjob/test/handler/DJobHandler.java | 2 +- .../disjob/test/handler/EJobHandler.java | 2 +- .../disjob/test/handler/NoopJobHandler.java | 2 +- .../handler/PrimeAccumulateJobHandler.java | 5 +- .../test/handler/PrimeCountJobHandler.java | 4 +- .../test/handler/TestBroadcastJobHandler.java | 2 +- .../disjob/worker/base/WorkerThreadPool.java | 53 ++++++++++++------- 18 files changed, 83 insertions(+), 54 deletions(-) diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/SupervisorCoreRpcService.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/SupervisorCoreRpcService.java index 348bb8f1a..c363a88ae 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/SupervisorCoreRpcService.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/SupervisorCoreRpcService.java @@ -29,7 +29,7 @@ */ @Hidden @RequestMapping("supervisor/core/rpc/") -public interface SupervisorCoreRpcService extends Savepoint { +public interface SupervisorCoreRpcService { @GetMapping("task/get") SchedTask getTask(long taskId) throws Exception; @@ -62,8 +62,7 @@ public interface SupervisorCoreRpcService extends Savepoint { // ---------------------------------------------------------------------------savepoint - @Override @PostMapping("task/savepoint") - void save(long taskId, String executeSnapshot) throws Exception; + void savepoint(long taskId, String executeSnapshot) throws Exception; } diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/Savepoint.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/Savepoint.java index 4eea4836a..bcfd2dc08 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/Savepoint.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/Savepoint.java @@ -19,11 +19,10 @@ public interface Savepoint { /** * Save the task execution snapshot * - * @param taskId the task id - * @param executeSnapshot the execution snapshot data + * @param executeSnapshot the task execution snapshot data * @throws Exception if saved occur exception */ - void save(long taskId, String executeSnapshot) throws Exception; + void save(String executeSnapshot) throws Exception; - Savepoint DISCARD = (taskId, executeSnapshot) -> {}; + Savepoint DISCARD = executeSnapshot -> {}; } diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java index f79029625..e844c7626 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java @@ -182,6 +182,13 @@ public class SchedJob extends BaseEntity implements Serializable { */ private String createdBy; + public Long obtainNextTriggerTime() { + if (nextTriggerTime == null || endTime == null) { + return nextTriggerTime; + } + return nextTriggerTime > endTime.getTime() ? null : nextTriggerTime; + } + public void verifyBeforeAdd() { TriggerType type = TriggerType.of(triggerType); if (!type.validate(triggerValue)) { diff --git a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java index 342b3aab1..797e4631d 100644 --- a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java +++ b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java @@ -51,13 +51,13 @@ public abstract class DatabaseServerRegistry private static final String REMOVE_DEAD_SQL = "DELETE FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time?"; + private static final String DISCOVER_SQL = "SELECT server FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time>?"; /** * Registry namespace diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/provider/SupervisorCoreRpcProvider.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/provider/SupervisorCoreRpcProvider.java index efc721af0..ccc11cbcb 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/provider/SupervisorCoreRpcProvider.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/provider/SupervisorCoreRpcProvider.java @@ -73,7 +73,7 @@ public boolean cancelInstance(long instanceId, Operations ops) { } @Override - public void save(long taskId, String executeSnapshot) { + public void savepoint(long taskId, String executeSnapshot) { jobManager.savepoint(taskId, executeSnapshot); } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/AbstractJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/AbstractJobManager.java index a0398614b..803320ecd 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/AbstractJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/AbstractJobManager.java @@ -76,14 +76,9 @@ public boolean disableJob(SchedJob job) { public boolean changeJobState(long jobId, JobState to) { boolean flag = isOneAffectedRow(jobMapper.updateState(jobId, to.value(), 1 ^ to.value())); - SchedJob job; - if (flag && to == JobState.ENABLE && TriggerType.FIXED_DELAY.equals((job = jobMapper.get(jobId)).getTriggerType())) { - Date date = null; - if (job.getLastTriggerTime() != null) { - date = TriggerType.FIXED_DELAY.computeNextTriggerTime(job.getTriggerValue(), new Date(job.getLastTriggerTime())); - } - Date nextTriggerTime = Dates.max(new Date(), job.getStartTime(), date); - jobMapper.updateFixedDelayNextTriggerTime(jobId, nextTriggerTime.getTime()); + if (flag && to == JobState.ENABLE) { + SchedJob job = jobMapper.get(jobId); + updateFixedDelayNextTriggerTime(job, Dates.ofTimeMillis(job.getLastTriggerTime())); } return flag; } @@ -151,6 +146,16 @@ public void deleteJob(long jobId) { // ------------------------------------------------------------------others operation + protected boolean updateFixedDelayNextTriggerTime(SchedJob job, Date baseTime) { + TriggerType fixedDelay = TriggerType.FIXED_DELAY; + if (!fixedDelay.equals(job.getTriggerType())) { + return false; + } + Date date = baseTime == null ? null : fixedDelay.computeNextTriggerTime(job.getTriggerValue(), baseTime); + Date nextTriggerTime = Dates.max(new Date(), job.getStartTime(), date); + return isOneAffectedRow(jobMapper.updateFixedDelayNextTriggerTime(job.getJobId(), nextTriggerTime.getTime())); + } + protected boolean isOneAffectedRow(int totalAffectedRow) { return totalAffectedRow == AFFECTED_ONE_ROW; } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/DistributedJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/DistributedJobManager.java index b17d3a5c1..523089d02 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/DistributedJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/DistributedJobManager.java @@ -718,14 +718,12 @@ private void updateFixedDelayNextTriggerTime(SchedInstance curr, LazyLoader split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000); LOG.info(this.getClass().getSimpleName() + " execution finished."); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/BJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/BJobHandler.java index 34930e8a9..ed123b276 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/BJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/BJobHandler.java @@ -50,7 +50,7 @@ public List split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000); LOG.info(this.getClass().getSimpleName() + " execution finished."); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/CJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/CJobHandler.java index 609a025c6..3f3aa7be4 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/CJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/CJobHandler.java @@ -50,7 +50,7 @@ public List split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000); LOG.info(this.getClass().getSimpleName() + " execution finished."); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/DJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/DJobHandler.java index ac4d07eff..679bec1a5 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/DJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/DJobHandler.java @@ -50,7 +50,7 @@ public List split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000); LOG.info(this.getClass().getSimpleName() + " execution finished."); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/EJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/EJobHandler.java index 534a16576..12daf425e 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/EJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/EJobHandler.java @@ -50,7 +50,7 @@ public List split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000); LOG.info(this.getClass().getSimpleName() + " execution finished."); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/NoopJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/NoopJobHandler.java index 9fb5bd81b..9d7b864ed 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/NoopJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/NoopJobHandler.java @@ -52,7 +52,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t LOG.info("task execute start: {}", executingTask.getTaskId()); Thread.sleep(major + ThreadLocalRandom.current().nextLong(minor)); LOG.info("task execute done: {}", executingTask.getTaskId()); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java index 5bc0e8775..ced367963 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java @@ -9,11 +9,13 @@ package cn.ponfee.disjob.test.handler; import cn.ponfee.disjob.common.util.Jsons; +import cn.ponfee.disjob.core.enums.RunState; import cn.ponfee.disjob.core.handle.ExecuteResult; import cn.ponfee.disjob.core.handle.JobHandler; import cn.ponfee.disjob.core.handle.Savepoint; import cn.ponfee.disjob.core.handle.execution.AbstractExecutionTask; import cn.ponfee.disjob.core.handle.execution.ExecutingTask; +import org.springframework.util.Assert; /** * 质数计数后的累加器 @@ -26,12 +28,13 @@ public class PrimeAccumulateJobHandler extends JobHandler { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { long sum = executingTask.getWorkflowPredecessorNodes() .stream() + .peek(e -> Assert.state(RunState.FINISHED.equals(e.getRunState()), "Previous instance unfinished: " + e.getInstanceId())) .flatMap(e -> e.getExecutedTasks().stream()) .map(AbstractExecutionTask::getExecuteSnapshot) .map(e -> Jsons.fromJson(e, PrimeCountJobHandler.ExecuteSnapshot.class)) .mapToLong(PrimeCountJobHandler.ExecuteSnapshot::getCount) .sum(); - savepoint.save(executingTask.getTaskId(), Long.toString(sum)); + savepoint.save(Long.toString(sum)); return ExecuteResult.success(); } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeCountJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeCountJobHandler.java index 92fddf099..c9c40ff52 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeCountJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeCountJobHandler.java @@ -108,7 +108,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t long nextSavepointTimeMillis = System.currentTimeMillis() + SAVEPOINT_INTERVAL_MS; while (next <= n) { if (super.isStopped() || Thread.currentThread().isInterrupted()) { - savepoint.save(executingTask.getTaskId(), Jsons.toJson(execution)); + savepoint.save(Jsons.toJson(execution)); throw new PauseTaskException(JobCodeMsg.PAUSE_TASK_EXCEPTION); } @@ -124,7 +124,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t } if (execution.isFinished() || nextSavepointTimeMillis < System.currentTimeMillis()) { - savepoint.save(executingTask.getTaskId(), Jsons.toJson(execution)); + savepoint.save(Jsons.toJson(execution)); nextSavepointTimeMillis = System.currentTimeMillis() + SAVEPOINT_INTERVAL_MS; } } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/TestBroadcastJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/TestBroadcastJobHandler.java index 8c818b95f..9c84485a2 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/TestBroadcastJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/TestBroadcastJobHandler.java @@ -37,7 +37,7 @@ public void init(ExecutingTask executingTask) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { Thread.sleep(5000 + ThreadLocalRandom.current().nextLong(10000)); LOG.info("Broadcast job execute done: {}", executingTask.getTaskId()); - savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName()); + savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName()); return ExecuteResult.success(); } diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java index f15ea96e4..ab08e3154 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java @@ -25,6 +25,7 @@ import cn.ponfee.disjob.core.exception.PauseTaskException; import cn.ponfee.disjob.core.handle.ExecuteResult; import cn.ponfee.disjob.core.handle.JobHandlerUtils; +import cn.ponfee.disjob.core.handle.Savepoint; import cn.ponfee.disjob.core.handle.TaskExecutor; import cn.ponfee.disjob.core.handle.execution.ExecutingTask; import cn.ponfee.disjob.core.handle.execution.WorkflowPredecessorNode; @@ -604,6 +605,21 @@ public BrokenThreadException(String message) { } } + private static class TaskSavepoint implements Savepoint { + private final SupervisorCoreRpcService client; + private final long taskId; + + public TaskSavepoint(SupervisorCoreRpcService client, long taskId) { + this.client = client; + this.taskId = taskId; + } + + @Override + public void save(String executeSnapshot) throws Exception { + client.savepoint(taskId, executeSnapshot); + } + } + /** * Worker thread */ @@ -619,7 +635,7 @@ private static class WorkerThread extends Thread { /** * Supervisor core rpc client */ - private final SupervisorCoreRpcService supervisorCoreRpcClient; + private final SupervisorCoreRpcService client; /** * Thread keep alive time @@ -647,10 +663,10 @@ private static class WorkerThread extends Thread { private final AtomicReference executingParam = new AtomicReference<>(); private WorkerThread(WorkerThreadPool threadPool, - SupervisorCoreRpcService supervisorCoreRpcClient, + SupervisorCoreRpcService client, long keepAliveTimeSeconds) { this.threadPool = threadPool; - this.supervisorCoreRpcClient = supervisorCoreRpcClient; + this.client = client; this.keepAliveTime = TimeUnit.SECONDS.toNanos(keepAliveTimeSeconds); super.setDaemon(true); @@ -733,7 +749,7 @@ public void run() { runTask(param); } catch (Throwable t) { LOG.error("Worker thread execute failed: " + param, t); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, EXECUTE_EXCEPTION, toErrorMsg(t)); + terminateTask(client, param, Operations.TRIGGER, EXECUTE_EXCEPTION, toErrorMsg(t)); } // return this to idle thread pool @@ -749,7 +765,7 @@ private void runTask(ExecuteTaskParam param) { SchedTask task; ExecutingTask executingTask; try { - if ((task = supervisorCoreRpcClient.getTask(param.getTaskId())) == null) { + if ((task = client.getTask(param.getTaskId())) == null) { LOG.error("Sched task not found {}", param); return; } @@ -763,12 +779,12 @@ private void runTask(ExecuteTaskParam param) { // build executing task List nodes = null; if (param.getJobType() == JobType.WORKFLOW) { - nodes = supervisorCoreRpcClient.findWorkflowPredecessorNodes(param.getWnstanceId(), param.getInstanceId()); + nodes = client.findWorkflowPredecessorNodes(param.getWnstanceId(), param.getInstanceId()); } executingTask = ExecutingTask.of(param.getJobId(), param.getWnstanceId(), task, nodes); // update database records start state(sched_instance, sched_task) - if (!supervisorCoreRpcClient.startTask(StartTaskParam.from(param))) { + if (!client.startTask(StartTaskParam.from(param))) { LOG.warn("Task start conflicted {}", param); return; } @@ -777,7 +793,7 @@ private void runTask(ExecuteTaskParam param) { if (param.getRouteStrategy() != RouteStrategy.BROADCAST) { // reset task worker final List list = Collections.singletonList(new TaskWorkerParam(param.getTaskId(), "")); - ThrowingRunnable.execute(() -> supervisorCoreRpcClient.updateTaskWorker(list), () -> "Reset task worker occur error: " + param); + ThrowingRunnable.execute(() -> client.updateTaskWorker(list), () -> "Reset task worker occur error: " + param); } Threads.interruptIfNecessary(t); // discard task @@ -791,7 +807,7 @@ private void runTask(ExecuteTaskParam param) { param.taskExecutor(taskExecutor); } catch (Throwable t) { LOG.error("Load job handler error: " + param, t); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, INSTANCE_FAILED, toErrorMsg(t)); + terminateTask(client, param, Operations.TRIGGER, INSTANCE_FAILED, toErrorMsg(t)); return; } @@ -801,7 +817,7 @@ private void runTask(ExecuteTaskParam param) { LOG.info("Initiated sched task {}", param.getTaskId()); } catch (Throwable t) { LOG.error("Task init error: " + param, t); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, INIT_EXCEPTION, toErrorMsg(t)); + terminateTask(client, param, Operations.TRIGGER, INIT_EXCEPTION, toErrorMsg(t)); Threads.interruptIfNecessary(t); return; } @@ -809,8 +825,9 @@ private void runTask(ExecuteTaskParam param) { // 3、execute try { ExecuteResult result; + Savepoint savepoint = new TaskSavepoint(client, executingTask.getTaskId()); if (param.getExecuteTimeout() > 0) { - FutureTask futureTask = new FutureTask<>(() -> taskExecutor.execute(executingTask, supervisorCoreRpcClient)); + FutureTask futureTask = new FutureTask<>(() -> taskExecutor.execute(executingTask, savepoint)); String threadName = getClass().getSimpleName() + "#FutureTaskThread" + "-" + FUTURE_TASK_NAMED_SEQ.getAndIncrement(); Thread futureTaskThread = Threads.newThread(threadName, true, Thread.NORM_PRIORITY, futureTask); futureTaskThread.start(); @@ -820,27 +837,27 @@ private void runTask(ExecuteTaskParam param) { Threads.stopThread(futureTaskThread, 0); } } else { - result = taskExecutor.execute(executingTask, supervisorCoreRpcClient); + result = taskExecutor.execute(executingTask, savepoint); } // 4、execute end if (result != null && result.isSuccess()) { LOG.info("Task execute finished: {} | {}", param.getTaskId(), result.getMsg()); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, FINISHED, null); + terminateTask(client, param, Operations.TRIGGER, FINISHED, null); } else { LOG.error("Task execute failed: {} | {}", param, result); String msg = (result == null) ? "null result" : result.getMsg(); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, EXECUTE_FAILED, msg); + terminateTask(client, param, Operations.TRIGGER, EXECUTE_FAILED, msg); } } catch (TimeoutException e) { LOG.error("Task execute timeout: " + param, e); - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, EXECUTE_TIMEOUT, toErrorMsg(e)); + terminateTask(client, param, Operations.TRIGGER, EXECUTE_TIMEOUT, toErrorMsg(e)); } catch (PauseTaskException e) { LOG.error("Pause task exception: {} | {}", param, e.getMessage()); - stopInstance(supervisorCoreRpcClient, param, Operations.PAUSE, toErrorMsg(e)); + stopInstance(client, param, Operations.PAUSE, toErrorMsg(e)); } catch (CancelTaskException e) { LOG.error("Cancel task exception: {} | {}", param, e.getMessage()); - stopInstance(supervisorCoreRpcClient, param, Operations.EXCEPTION_CANCEL, toErrorMsg(e)); + stopInstance(client, param, Operations.EXCEPTION_CANCEL, toErrorMsg(e)); } catch (Throwable t) { if (t instanceof java.lang.ThreadDeath) { LOG.warn("Task execute thread death: {} | {}", param, t.getMessage()); @@ -849,7 +866,7 @@ private void runTask(ExecuteTaskParam param) { } else { LOG.error("Task execute occur error: " + param, t); } - terminateTask(supervisorCoreRpcClient, param, Operations.TRIGGER, EXECUTE_EXCEPTION, toErrorMsg(t)); + terminateTask(client, param, Operations.TRIGGER, EXECUTE_EXCEPTION, toErrorMsg(t)); Threads.interruptIfNecessary(t); } finally { // 5、destroy