Skip to content

Commit 39f0bd6

Browse files
committed
database update affected row
1 parent 06be4d3 commit 39f0bd6

File tree

2 files changed

+61
-29
lines changed

2 files changed

+61
-29
lines changed

disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/AbstractJobManager.java

+41-9
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import java.util.*;
4040
import java.util.function.Function;
41+
import java.util.function.Supplier;
4142
import java.util.stream.Collectors;
4243
import java.util.stream.IntStream;
4344

@@ -54,9 +55,9 @@ public abstract class AbstractJobManager {
5455

5556
private static final int MAX_SPLIT_TASK_SIZE = 1000;
5657
private static final List<TriggerType> FIXED_TYPES = ImmutableList.of(TriggerType.FIXED_RATE, TriggerType.FIXED_DELAY);
58+
private static final int AFFECTED_ONE_ROW = 1;
5759

5860
protected static final String TX_MANAGER_NAME = DB_NAME + TX_MANAGER_NAME_SUFFIX;
59-
protected static final int AFFECTED_ONE_ROW = 1;
6061

6162
protected final SchedJobMapper jobMapper;
6263
protected final SchedDependMapper dependMapper;
@@ -69,11 +70,11 @@ public abstract class AbstractJobManager {
6970
// ------------------------------------------------------------------database single operation without spring transactional
7071

7172
public boolean disableJob(SchedJob job) {
72-
return jobMapper.disable(job) == AFFECTED_ONE_ROW;
73+
return isOneAffectedRow(jobMapper.disable(job));
7374
}
7475

7576
public boolean changeJobState(long jobId, JobState to) {
76-
boolean flag = jobMapper.updateState(jobId, to.value(), 1 ^ to.value()) == AFFECTED_ONE_ROW;
77+
boolean flag = isOneAffectedRow(jobMapper.updateState(jobId, to.value(), 1 ^ to.value()));
7778
SchedJob job;
7879
if (flag && to == JobState.ENABLE && TriggerType.FIXED_DELAY.equals((job = jobMapper.get(jobId)).getTriggerType())) {
7980
Date date = null;
@@ -87,11 +88,11 @@ public boolean changeJobState(long jobId, JobState to) {
8788
}
8889

8990
public boolean updateJobNextTriggerTime(SchedJob job) {
90-
return jobMapper.updateNextTriggerTime(job) == AFFECTED_ONE_ROW;
91+
return isOneAffectedRow(jobMapper.updateNextTriggerTime(job));
9192
}
9293

9394
public boolean updateJobNextScanTime(SchedJob schedJob) {
94-
return jobMapper.updateNextScanTime(schedJob) == AFFECTED_ONE_ROW;
95+
return isOneAffectedRow(jobMapper.updateNextScanTime(schedJob));
9596
}
9697

9798
// ------------------------------------------------------------------database operation within spring transactional
@@ -132,7 +133,7 @@ public void updateJob(SchedJob job) throws JobCheckedException {
132133
}
133134

134135
job.setUpdatedAt(new Date());
135-
Assert.state(jobMapper.update(job) == AFFECTED_ONE_ROW, "Update sched job fail or conflict.");
136+
assertOneAffectedRow(jobMapper.update(job), "Update sched job fail or conflict.");
136137
}
137138

138139
@Transactional(transactionManager = TX_MANAGER_NAME, rollbackFor = Exception.class)
@@ -142,13 +143,45 @@ public void deleteJob(long jobId) {
142143
if (JobState.ENABLE.equals(job.getJobState())) {
143144
throw new IllegalStateException("Please disable job before delete this job.");
144145
}
145-
Assert.isTrue(jobMapper.softDelete(jobId) == AFFECTED_ONE_ROW, "Delete sched job fail or conflict.");
146+
assertOneAffectedRow(jobMapper.softDelete(jobId), "Delete sched job fail or conflict.");
146147
dependMapper.deleteByParentJobId(jobId);
147148
dependMapper.deleteByChildJobId(jobId);
148149
}
149150

150151
// ------------------------------------------------------------------others operation
151152

153+
protected boolean isOneAffectedRow(int totalAffectedRow) {
154+
return totalAffectedRow == AFFECTED_ONE_ROW;
155+
}
156+
157+
protected boolean isManyAffectedRow(int totalAffectedRow) {
158+
return totalAffectedRow >= AFFECTED_ONE_ROW;
159+
}
160+
161+
protected void assertOneAffectedRow(int totalAffectedRow, Supplier<String> errorMsgSupplier) {
162+
if (totalAffectedRow != AFFECTED_ONE_ROW) {
163+
throw new IllegalStateException(errorMsgSupplier.get());
164+
}
165+
}
166+
167+
protected void assertOneAffectedRow(int totalAffectedRow, String errorMsg) {
168+
if (totalAffectedRow != AFFECTED_ONE_ROW) {
169+
throw new IllegalStateException(errorMsg);
170+
}
171+
}
172+
173+
protected void assertManyAffectedRow(int totalAffectedRow, Supplier<String> errorMsgSupplier) {
174+
if (totalAffectedRow < AFFECTED_ONE_ROW) {
175+
throw new IllegalStateException(errorMsgSupplier.get());
176+
}
177+
}
178+
179+
protected void assertManyAffectedRow(int totalAffectedRow, String errorMsg) {
180+
if (totalAffectedRow < AFFECTED_ONE_ROW) {
181+
throw new IllegalStateException(errorMsg);
182+
}
183+
}
184+
152185
public long generateId() {
153186
return idGenerator.generateId();
154187
}
@@ -295,8 +328,7 @@ private void parseTriggerConfig(SchedJob job, boolean isUpdate) {
295328
} else {
296329
Date nextTriggerTime;
297330
if (FIXED_TYPES.contains(triggerType)) {
298-
// initial delay 30 seconds
299-
nextTriggerTime = Dates.max(Dates.plusSeconds(new Date(), 30), job.getStartTime());
331+
nextTriggerTime = Dates.max(new Date(), job.getStartTime());
300332
} else {
301333
Date baseTime = Dates.max(new Date(), job.getStartTime());
302334
nextTriggerTime = triggerType.computeNextFireTime(job.getTriggerValue(), baseTime);

disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/service/DistributedJobManager.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,16 @@ public DistributedJobManager(SchedJobMapper jobMapper,
102102
// ------------------------------------------------------------------database single operation without spring transactional
103103

104104
public boolean renewInstanceUpdateTime(SchedInstance instance, Date updateTime) {
105-
return instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()) == AFFECTED_ONE_ROW;
105+
return isOneAffectedRow(instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()));
106106
}
107107

108108
@Override
109109
protected boolean cancelWaitingTask(long taskId) {
110-
return taskMapper.terminate(taskId, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null) == AFFECTED_ONE_ROW;
110+
return isOneAffectedRow(taskMapper.terminate(taskId, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null));
111111
}
112112

113113
public void savepoint(long taskId, String executeSnapshot) {
114-
Assert.state(taskMapper.savepoint(taskId, executeSnapshot) == AFFECTED_ONE_ROW, () -> "Save point failed: " + taskId + " | " + executeSnapshot);
114+
assertOneAffectedRow(taskMapper.savepoint(taskId, executeSnapshot), () -> "Save point failed: " + taskId + " | " + executeSnapshot);
115115
}
116116

117117
// ------------------------------------------------------------------database operation within spring transactional
@@ -228,27 +228,27 @@ public void deleteInstance(long instanceId) {
228228

229229
// delete workflow lead instance
230230
int row = instanceMapper.deleteByInstanceId(instanceId);
231-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Delete workflow lead instance conflict: " + instanceId);
231+
assertOneAffectedRow(row, () -> "Delete workflow lead instance conflict: " + instanceId);
232232

233233
// delete task
234234
for (SchedInstance e : instanceMapper.findWorkflowNode(instance.getWnstanceId())) {
235235
row = taskMapper.deleteByInstanceId(e.getInstanceId());
236-
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched task conflict: " + instanceId);
236+
assertManyAffectedRow(row, () -> "Delete sched task conflict: " + instanceId);
237237
}
238238

239239
// delete workflow node instance
240240
row = instanceMapper.deleteByWnstanceId(instanceId);
241-
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete workflow node instance conflict: " + instanceId);
241+
assertManyAffectedRow(row, () -> "Delete workflow node instance conflict: " + instanceId);
242242

243243
// delete workflow config
244244
row = workflowMapper.deleteByWnstanceId(instanceId);
245-
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched workflow conflict: " + instanceId);
245+
assertManyAffectedRow(row, () -> "Delete sched workflow conflict: " + instanceId);
246246
} else {
247247
int row = instanceMapper.deleteByInstanceId(instanceId);
248-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Delete sched instance conflict: " + instanceId);
248+
assertOneAffectedRow(row, () -> "Delete sched instance conflict: " + instanceId);
249249

250250
row = taskMapper.deleteByInstanceId(instanceId);
251-
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched task conflict: " + instanceId);
251+
assertManyAffectedRow(row, () -> "Delete sched task conflict: " + instanceId);
252252
}
253253
LOG.info("Delete sched instance success {}", instanceId);
254254
});
@@ -277,7 +277,7 @@ public boolean terminateTask(TerminateTaskParam param) {
277277

278278
Date executeEndTime = toState.isTerminal() ? new Date() : null;
279279
int row = taskMapper.terminate(param.getTaskId(), param.getWorker(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg());
280-
if (row != AFFECTED_ONE_ROW) {
280+
if (!isOneAffectedRow(row)) {
281281
// usual is worker invoke http timeout, then retry
282282
LOG.warn("Conflict terminate executing task: {} | {}", param.getTaskId(), toState);
283283
return false;
@@ -335,7 +335,7 @@ public boolean purgeInstance(SchedInstance inst) {
335335
// cannot be paused
336336
Assert.isTrue(tuple.a.isTerminal(), () -> "Purge instance state must be terminal state: " + instance);
337337
}
338-
if (instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_TERMINABLE, tuple.b) != AFFECTED_ONE_ROW) {
338+
if (!isOneAffectedRow(instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_TERMINABLE, tuple.b))) {
339339
return false;
340340
}
341341

@@ -443,7 +443,7 @@ public boolean resumeInstance(long instanceId) {
443443
Assert.isTrue(instance.isWorkflowLead(), () -> "Cannot resume workflow node instance: " + instanceId);
444444
// update sched_instance paused lead to running state
445445
int row = instanceMapper.updateState(instanceId, RunState.RUNNING.value(), RunState.PAUSED.value());
446-
Assert.state(row == AFFECTED_ONE_ROW, () -> "Resume workflow lead instance failed: " + instanceId);
446+
assertOneAffectedRow(row, () -> "Resume workflow lead instance failed: " + instanceId);
447447
workflowMapper.resumeWaiting(instanceId);
448448
for (SchedInstance nodeInstance : instanceMapper.findWorkflowNode(instanceId)) {
449449
if (RunState.PAUSED.equals(nodeInstance.getRunState())) {
@@ -540,7 +540,7 @@ private void pauseInstance(SchedInstance instance) {
540540
// must be paused or terminate
541541
Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
542542
int row = instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_PAUSABLE, tuple.b);
543-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Pause instance failed: " + instance + " | " + tuple.a);
543+
assertOneAffectedRow(row, () -> "Pause instance failed: " + instance + " | " + tuple.a);
544544
if (instance.isWorkflowNode()) {
545545
updateWorkflowEdgeState(instance, tuple.a.value(), RUN_STATE_PAUSABLE);
546546
}
@@ -568,7 +568,7 @@ private void cancelInstance(SchedInstance instance, Operations ops) {
568568

569569
RunState toState = tuple.a;
570570
int row = instanceMapper.terminate(instanceId, toState.value(), RUN_STATE_TERMINABLE, tuple.b);
571-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Cancel instance failed: " + instance + " | " + toState);
571+
assertOneAffectedRow(row, () -> "Cancel instance failed: " + instance + " | " + toState);
572572
if (instance.isWorkflowNode()) {
573573
updateWorkflowEdgeState(instance, tuple.a.value(), RUN_STATE_TERMINABLE);
574574
}
@@ -581,10 +581,10 @@ private void cancelInstance(SchedInstance instance, Operations ops) {
581581
private void resumeInstance(SchedInstance instance) {
582582
long instanceId = instance.getInstanceId();
583583
int row = instanceMapper.updateState(instanceId, RunState.WAITING.value(), RunState.PAUSED.value());
584-
Assert.state(row == AFFECTED_ONE_ROW, "Resume sched instance failed.");
584+
assertOneAffectedRow(row, "Resume sched instance failed.");
585585

586586
row = taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
587-
Assert.state(row >= AFFECTED_ONE_ROW, "Resume sched task failed.");
587+
assertManyAffectedRow(row, "Resume sched task failed.");
588588

589589
// dispatch task
590590
Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = buildDispatchParams(instanceId, row);
@@ -602,11 +602,11 @@ private void updateWorkflowLeadState(SchedInstance instance) {
602602
if (graph.allMatch(e -> e.getValue().isTerminal())) {
603603
RunState state = graph.anyMatch(e -> e.getValue().isFailure()) ? RunState.CANCELED : RunState.FINISHED;
604604
int row = instanceMapper.terminate(instance.getWnstanceId(), state.value(), RUN_STATE_TERMINABLE, new Date());
605-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
605+
assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
606606
} else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equals(e.getRunState()))) {
607607
RunState state = RunState.PAUSED;
608608
int row = instanceMapper.updateState(instance.getWnstanceId(), state.value(), instance.getRunState());
609-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
609+
assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
610610
}
611611
}
612612

@@ -739,7 +739,7 @@ private void processWorkflow(SchedInstance nodeInstance) {
739739
if (graph.allMatch(e -> e.getValue().isTerminal())) {
740740
RunState state = graph.anyMatch(e -> e.getValue().isFailure()) ? RunState.CANCELED : RunState.FINISHED;
741741
int row = instanceMapper.terminate(wnstanceId, state.value(), RUN_STATE_TERMINABLE, new Date());
742-
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
742+
assertOneAffectedRow(row, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
743743
afterTerminateTask(instanceMapper.get(wnstanceId));
744744
return;
745745
}
@@ -887,7 +887,7 @@ private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operat
887887
// update dead task
888888
Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
889889
int row = taskMapper.terminate(task.getTaskId(), task.getWorker(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null);
890-
if (row != AFFECTED_ONE_ROW) {
890+
if (!isOneAffectedRow(row)) {
891891
LOG.error("Cancel the dead task failed: {}", task);
892892
executingTasks.add(builder.build(ops, task.getTaskId(), triggerTime, worker));
893893
} else {

0 commit comments

Comments
 (0)