Skip to content

Commit

Permalink
refactor savepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Nov 9, 2023
1 parent f9d9833 commit 762804f
Show file tree
Hide file tree
Showing 18 changed files with 83 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Hidden
@RequestMapping("supervisor/core/rpc/")
public interface SupervisorCoreRpcService extends Savepoint {
public interface SupervisorCoreRpcService {

@GetMapping("task/get")
SchedTask getTask(long taskId) throws Exception;
Expand Down Expand Up @@ -62,8 +62,7 @@ public interface SupervisorCoreRpcService extends Savepoint {

// ---------------------------------------------------------------------------savepoint

@Override
@PostMapping("task/savepoint")
void save(long taskId, String executeSnapshot) throws Exception;
void savepoint(long taskId, String executeSnapshot) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ public interface Savepoint {
/**
* Save the task execution snapshot
*
* @param taskId the task id
* @param executeSnapshot the execution snapshot data
* @param executeSnapshot the task execution snapshot data
* @throws Exception if saved occur exception
*/
void save(long taskId, String executeSnapshot) throws Exception;
void save(String executeSnapshot) throws Exception;

Savepoint DISCARD = (taskId, executeSnapshot) -> {};
Savepoint DISCARD = executeSnapshot -> {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ public class SchedJob extends BaseEntity implements Serializable {
*/
private String createdBy;

public Long obtainNextTriggerTime() {
if (nextTriggerTime == null || endTime == null) {
return nextTriggerTime;
}
return nextTriggerTime > endTime.getTime() ? null : nextTriggerTime;
}

public void verifyBeforeAdd() {
TriggerType type = TriggerType.of(triggerType);
if (!type.validate(triggerValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public abstract class DatabaseServerRegistry<R extends Server, D extends Server>

private static final String REMOVE_DEAD_SQL = "DELETE FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time<?";

private static final String REGISTER_SQL = "INSERT INTO " + TABLE_NAME + " (namespace, role, server, heartbeat_time) VALUES (?, ?, ?, ?)";
private static final String REGISTER_SQL = "INSERT INTO " + TABLE_NAME + " (namespace, role, server, heartbeat_time) VALUES (?, ?, ?, ?)";

private static final String HEARTBEAT_SQL = "UPDATE " + TABLE_NAME + " SET heartbeat_time=? WHERE namespace=? AND role=? AND server=?";
private static final String HEARTBEAT_SQL = "UPDATE " + TABLE_NAME + " SET heartbeat_time=? WHERE namespace=? AND role=? AND server=?";

private static final String DEREGISTER_SQL = "DELETE FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND server=?";
private static final String DEREGISTER_SQL = "DELETE FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND server=?";

private static final String DISCOVER_SQL = "SELECT server FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time>?";
private static final String DISCOVER_SQL = "SELECT server FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time>?";

/**
* Registry namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean cancelInstance(long instanceId, Operations ops) {
}

@Override
public void save(long taskId, String executeSnapshot) {
public void savepoint(long taskId, String executeSnapshot) {
jobManager.savepoint(taskId, executeSnapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,9 @@ public boolean disableJob(SchedJob job) {

public boolean changeJobState(long jobId, JobState to) {
boolean flag = isOneAffectedRow(jobMapper.updateState(jobId, to.value(), 1 ^ to.value()));
SchedJob job;
if (flag && to == JobState.ENABLE && TriggerType.FIXED_DELAY.equals((job = jobMapper.get(jobId)).getTriggerType())) {
Date date = null;
if (job.getLastTriggerTime() != null) {
date = TriggerType.FIXED_DELAY.computeNextTriggerTime(job.getTriggerValue(), new Date(job.getLastTriggerTime()));
}
Date nextTriggerTime = Dates.max(new Date(), job.getStartTime(), date);
jobMapper.updateFixedDelayNextTriggerTime(jobId, nextTriggerTime.getTime());
if (flag && to == JobState.ENABLE) {
SchedJob job = jobMapper.get(jobId);
updateFixedDelayNextTriggerTime(job, Dates.ofTimeMillis(job.getLastTriggerTime()));
}
return flag;
}
Expand Down Expand Up @@ -151,6 +146,16 @@ public void deleteJob(long jobId) {

// ------------------------------------------------------------------others operation

protected boolean updateFixedDelayNextTriggerTime(SchedJob job, Date baseTime) {
TriggerType fixedDelay = TriggerType.FIXED_DELAY;
if (!fixedDelay.equals(job.getTriggerType())) {
return false;
}
Date date = baseTime == null ? null : fixedDelay.computeNextTriggerTime(job.getTriggerValue(), baseTime);
Date nextTriggerTime = Dates.max(new Date(), job.getStartTime(), date);
return isOneAffectedRow(jobMapper.updateFixedDelayNextTriggerTime(job.getJobId(), nextTriggerTime.getTime()));
}

protected boolean isOneAffectedRow(int totalAffectedRow) {
return totalAffectedRow == AFFECTED_ONE_ROW;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,14 +718,12 @@ private void updateFixedDelayNextTriggerTime(SchedInstance curr, LazyLoader<Sche

// 3、如果是可重试,则要等到最后的那次重试完时来计算下次的延时执行时间
SchedJob job = lazyJob.orElse(null);
if ( job == null
|| !TriggerType.FIXED_DELAY.equals(job.getTriggerType())
|| job.retryable(RunState.of(curr.getRunState()), curr.obtainRetriedCount())) {
if (job == null || job.retryable(RunState.of(curr.getRunState()), curr.obtainRetriedCount())) {
return;
}

Date nextTriggerTime = TriggerType.FIXED_DELAY.computeNextTriggerTime(job.getTriggerValue(), curr.getRunEndTime());
jobMapper.updateFixedDelayNextTriggerTime(job.getJobId(), nextTriggerTime.getTime());
// 4、do update nextTriggerTime
super.updateFixedDelayNextTriggerTime(job, curr.getRunEndTime());
}

private void processWorkflow(SchedInstance nodeInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void processJob(SchedJob job, Date now, long maxNextTriggerTime) {
private Long reComputeNextTriggerTime(SchedJob job, Date now) {
if (TriggerType.FIXED_DELAY.equals(job.getTriggerType())) {
// 固定延时类型不重新计算nextTriggerTime
return job.getNextTriggerTime();
return job.obtainNextTriggerTime();
}
if (now.getTime() <= (job.getNextTriggerTime() + afterMilliseconds)) {
// 没有过期不重新计算nextTriggerTime
Expand All @@ -199,6 +199,7 @@ private Long reComputeNextTriggerTime(SchedJob job, Date now) {
private static Long doComputeNextTriggerTime(SchedJob job, Date now) {
if (TriggerType.FIXED_DELAY.equals(job.getTriggerType())) {
// 固定延时类型的nextTriggerTime:先更新为long最大值,当任务实例运行完成时去主动计算并更新
// null值已被用作表示没有下次触发时间
return Long.MAX_VALUE;
}
return TriggerTimeUtils.computeNextTriggerTime(job, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<SplitTask> split(String jobParamString) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000);
LOG.info(this.getClass().getSimpleName() + " execution finished.");
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<SplitTask> split(String jobParamString) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000);
LOG.info(this.getClass().getSimpleName() + " execution finished.");
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<SplitTask> split(String jobParamString) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000);
LOG.info(this.getClass().getSimpleName() + " execution finished.");
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<SplitTask> split(String jobParamString) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000);
LOG.info(this.getClass().getSimpleName() + " execution finished.");
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<SplitTask> split(String jobParamString) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) + 1000);
LOG.info(this.getClass().getSimpleName() + " execution finished.");
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t
LOG.info("task execute start: {}", executingTask.getTaskId());
Thread.sleep(major + ThreadLocalRandom.current().nextLong(minor));
LOG.info("task execute done: {}", executingTask.getTaskId());
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
package cn.ponfee.disjob.test.handler;

import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.handle.ExecuteResult;
import cn.ponfee.disjob.core.handle.JobHandler;
import cn.ponfee.disjob.core.handle.Savepoint;
import cn.ponfee.disjob.core.handle.execution.AbstractExecutionTask;
import cn.ponfee.disjob.core.handle.execution.ExecutingTask;
import org.springframework.util.Assert;

/**
* 质数计数后的累加器
Expand All @@ -26,12 +28,13 @@ public class PrimeAccumulateJobHandler extends JobHandler {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
long sum = executingTask.getWorkflowPredecessorNodes()
.stream()
.peek(e -> Assert.state(RunState.FINISHED.equals(e.getRunState()), "Previous instance unfinished: " + e.getInstanceId()))
.flatMap(e -> e.getExecutedTasks().stream())
.map(AbstractExecutionTask::getExecuteSnapshot)
.map(e -> Jsons.fromJson(e, PrimeCountJobHandler.ExecuteSnapshot.class))
.mapToLong(PrimeCountJobHandler.ExecuteSnapshot::getCount)
.sum();
savepoint.save(executingTask.getTaskId(), Long.toString(sum));
savepoint.save(Long.toString(sum));

return ExecuteResult.success();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t
long nextSavepointTimeMillis = System.currentTimeMillis() + SAVEPOINT_INTERVAL_MS;
while (next <= n) {
if (super.isStopped() || Thread.currentThread().isInterrupted()) {
savepoint.save(executingTask.getTaskId(), Jsons.toJson(execution));
savepoint.save(Jsons.toJson(execution));
throw new PauseTaskException(JobCodeMsg.PAUSE_TASK_EXCEPTION);
}

Expand All @@ -124,7 +124,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t
}

if (execution.isFinished() || nextSavepointTimeMillis < System.currentTimeMillis()) {
savepoint.save(executingTask.getTaskId(), Jsons.toJson(execution));
savepoint.save(Jsons.toJson(execution));
nextSavepointTimeMillis = System.currentTimeMillis() + SAVEPOINT_INTERVAL_MS;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void init(ExecutingTask executingTask) {
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
Thread.sleep(5000 + ThreadLocalRandom.current().nextLong(10000));
LOG.info("Broadcast job execute done: {}", executingTask.getTaskId());
savepoint.save(executingTask.getTaskId(), Dates.format(new Date()) + ": " + getClass().getSimpleName());
savepoint.save(Dates.format(new Date()) + ": " + getClass().getSimpleName());
return ExecuteResult.success();
}

Expand Down
Loading

0 comments on commit 762804f

Please sign in to comment.