Skip to content

Commit 6d7b52f

Browse files
committed
Improve job scheduler
1 parent ee4040b commit 6d7b52f

File tree

17 files changed

+731
-1154
lines changed

17 files changed

+731
-1154
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@ spark-warehouse
4949
*.flattened-pom.xml
5050

5151
seatunnel-examples
52-
/lib/*
52+
/lib/*
53+
version.properties

seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ source {
3333
parallelism = ${fake_parallelism}
3434
username = ${username}
3535
password = ${password}
36+
partition= ${partition111}
3637
schema = {
3738
fields {
3839
name = "string"

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,6 @@ public void testSubmitJob() {
189189
.getNodeExtension()
190190
.createExtensionServices()
191191
.get(Constant.SEATUNNEL_SERVICE_NAME);
192-
JobStatus jobStatus =
193-
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
194-
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
195192
Awaitility.await()
196193
.atMost(2, TimeUnit.MINUTES)
197194
.untilAsserted(

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
2222
rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
2323

2424
logger.zeta.name=org.apache.seatunnel.engine
25-
logger.zeta.level=WARN
25+
logger.zeta.level=INFO
2626

2727
appender.consoleStdout.name = consoleStdoutAppender
2828
appender.consoleStdout.type = CONSOLE

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testContentFormatUtil() throws InterruptedException {
4343
new JobStatusData(
4444
4352352414135L + i,
4545
"Testfdsafew" + i,
46-
JobStatus.CANCELLING,
46+
JobStatus.CANCELING,
4747
System.currentTimeMillis(),
4848
System.currentTimeMillis()));
4949
Thread.sleep(2L);
@@ -53,7 +53,7 @@ public void testContentFormatUtil() throws InterruptedException {
5353
new JobStatusData(
5454
4352352414135L + i,
5555
"fdsafsddfasfsdafasdf" + i,
56-
JobStatus.RECONCILING,
56+
JobStatus.UNKNOWABLE,
5757
System.currentTimeMillis(),
5858
null));
5959
Thread.sleep(2L);
Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,29 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.engine.server.scheduler;
18+
package org.apache.seatunnel.engine.common.exception;
1919

20-
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
20+
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
21+
import com.hazelcast.core.HazelcastException;
2122

22-
import lombok.NonNull;
23+
public class TaskGroupDeployException extends HazelcastException
24+
implements ClientExceptionFactory.ExceptionFactory {
25+
public TaskGroupDeployException() {}
2326

24-
import java.util.concurrent.CompletableFuture;
27+
public TaskGroupDeployException(String message) {
28+
super(message);
29+
}
2530

26-
public interface JobScheduler {
27-
CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan);
31+
public TaskGroupDeployException(String message, Throwable cause) {
32+
super(message, cause);
33+
}
2834

29-
void startScheduling();
35+
public TaskGroupDeployException(Throwable cause) {
36+
super(cause);
37+
}
38+
39+
@Override
40+
public Throwable createException(String s, Throwable throwable) {
41+
return new TaskGroupDeployException(s, throwable);
42+
}
3043
}

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ public enum JobStatus {
2929
/** Job is newly created, no task has started to run. */
3030
CREATED(EndState.NOT_END),
3131

32-
/** Job is begin schedule but some task not deploy complete. */
32+
/** Job will scheduler every pipeline */
3333
SCHEDULED(EndState.NOT_END),
3434

35-
/** Some tasks are scheduled or running, some may be pending, some may be finished. */
35+
/** Job is running and begine to scheduler pipeline running. */
3636
RUNNING(EndState.NOT_END),
3737

3838
/** The job has failed and is currently waiting for the cleanup to complete. */
@@ -42,26 +42,14 @@ public enum JobStatus {
4242
FAILED(EndState.GLOBALLY),
4343

4444
/** Job is being cancelled. */
45-
CANCELLING(EndState.NOT_END),
45+
CANCELING(EndState.NOT_END),
4646

4747
/** Job has been cancelled. */
4848
CANCELED(EndState.GLOBALLY),
4949

5050
/** All of the job's tasks have successfully finished. */
5151
FINISHED(EndState.GLOBALLY),
5252

53-
/** The job is currently undergoing a reset and total restart. */
54-
RESTARTING(EndState.NOT_END),
55-
56-
/**
57-
* The job has been suspended which means that it has been stopped but not been removed from a
58-
* potential HA job store.
59-
*/
60-
SUSPENDED(EndState.LOCALLY),
61-
62-
/** The job is currently reconciling and waits for task execution report to recover state. */
63-
RECONCILING(EndState.NOT_END),
64-
6553
/** Cannot find the JobID or the job status has already been cleared. */
6654
UNKNOWABLE(EndState.GLOBALLY);
6755

@@ -79,30 +67,6 @@ private enum EndState {
7967
this.endState = endState;
8068
}
8169

82-
/**
83-
* Checks whether this state is <i>globally terminal</i>. A globally terminal job is complete
84-
* and cannot fail any more and will not be restarted or recovered by another standby master
85-
* node.
86-
*
87-
* <p>When a globally terminal state has been reached, all recovery data for the job is dropped
88-
* from the high-availability services.
89-
*
90-
* @return True, if this job status is globally terminal, false otherwise.
91-
*/
92-
public boolean isGloballyEndState() {
93-
return endState == EndState.GLOBALLY;
94-
}
95-
96-
/**
97-
* Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the state of
98-
* a job's execution graph within an executing JobManager. If the execution graph is locally
99-
* terminal, the JobManager will not continue executing or recovering the job.
100-
*
101-
* <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
102-
* which is typically entered when the executing JobManager loses its leader status.
103-
*
104-
* @return True, if this job status is terminal, false otherwise.
105-
*/
10670
public boolean isEndState() {
10771
return endState != EndState.NOT_END;
10872
}

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineStatus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ public enum PipelineStatus {
6666

6767
CANCELED,
6868

69-
FAILED,
69+
FAILING,
7070

71-
RECONCILING,
71+
FAILED,
7272

7373
/** Restoring last possible valid state of the pipeline if it has it. */
7474
INITIALIZING;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java

Lines changed: 27 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -275,92 +275,37 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
275275
metricsImap,
276276
engineConfig);
277277

278-
// If Job Status is CANCELLING , set needRestore to false
279278
try {
280-
jobMaster.init(
281-
runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
282-
true,
283-
!JobStatus.CANCELLING.equals(jobStatus));
279+
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), true);
284280
} catch (Exception e) {
285281
throw new SeaTunnelEngineException(String.format("Job id %s init failed", jobId), e);
286282
}
287283

288284
String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
289-
if (jobStatus.isEndState()) {
290-
logger.info(
291-
String.format(
292-
"The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
293-
jobFullName, jobStatus));
294-
jobMaster.cleanJob();
295-
return;
296-
}
297-
298-
if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
299-
CompletableFuture.runAsync(
300-
() -> {
301-
logger.info(
302-
String.format(
303-
"The restore %s is state %s, cancel job and submit it again.",
304-
jobFullName, jobStatus));
305-
jobMaster.cancelJob();
306-
jobMaster.getJobMasterCompleteFuture().join();
307-
submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
308-
},
309-
executorService);
310-
311-
return;
312-
}
313-
314285
runningJobMasterMap.put(jobId, jobMaster);
315-
jobMaster.markRestore();
316286

317-
if (JobStatus.CANCELLING.equals(jobStatus)) {
318-
logger.info(
319-
String.format(
320-
"The restore %s is in %s state, cancel the job",
321-
jobFullName, jobStatus));
322-
CompletableFuture.runAsync(
323-
() -> {
324-
try {
325-
jobMaster.cancelJob();
326-
jobMaster.run();
327-
} finally {
328-
// voidCompletableFuture will be cancelled when zeta master node
329-
// shutdown to simulate master failure,
330-
// don't update runningJobMasterMap is this case.
331-
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
332-
runningJobMasterMap.remove(jobId);
333-
}
334-
}
335-
},
336-
executorService);
337-
return;
338-
}
339-
340-
if (JobStatus.RUNNING.equals(jobStatus)) {
341-
logger.info(
342-
String.format(
343-
"The restore %s is in %s state, restore pipeline and take over this job running",
344-
jobFullName, jobStatus));
345-
CompletableFuture.runAsync(
346-
() -> {
347-
try {
348-
jobMaster
349-
.getPhysicalPlan()
350-
.getPipelineList()
351-
.forEach(SubPlan::restorePipelineState);
352-
jobMaster.run();
353-
} finally {
354-
// voidCompletableFuture will be cancelled when zeta master node
355-
// shutdown to simulate master failure,
356-
// don't update runningJobMasterMap is this case.
357-
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
358-
runningJobMasterMap.remove(jobId);
359-
}
287+
logger.info(
288+
String.format(
289+
"The restore %s is in %s state, restore pipeline and take over this job running",
290+
jobFullName, jobStatus));
291+
CompletableFuture.runAsync(
292+
() -> {
293+
try {
294+
jobMaster
295+
.getPhysicalPlan()
296+
.getPipelineList()
297+
.forEach(SubPlan::restorePipelineState);
298+
jobMaster.run();
299+
} finally {
300+
// voidCompletableFuture will be cancelled when zeta master node
301+
// shutdown to simulate master failure,
302+
// don't update runningJobMasterMap is this case.
303+
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
304+
runningJobMasterMap.remove(jobId);
360305
}
361-
},
362-
executorService);
363-
}
306+
}
307+
},
308+
executorService);
364309
}
365310

366311
private void checkNewActiveMaster() {
@@ -390,10 +335,11 @@ private void checkNewActiveMaster() {
390335
}
391336
}
392337

393-
private void clearCoordinatorService() {
338+
public synchronized void clearCoordinatorService() {
394339
// interrupt all JobMaster
395340
runningJobMasterMap.values().forEach(JobMaster::interrupt);
396341
executorService.shutdownNow();
342+
runningJobMasterMap.clear();
397343

398344
try {
399345
executorService.awaitTermination(20, TimeUnit.SECONDS);
@@ -457,9 +403,7 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
457403
new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
458404
runningJobMasterMap.put(jobId, jobMaster);
459405
jobMaster.init(
460-
runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
461-
false,
462-
true);
406+
runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
463407
// We specify that when init is complete, the submitJob is complete
464408
jobSubmitFuture.complete(null);
465409
} catch (Throwable e) {
@@ -690,7 +634,7 @@ private void makeTasksFailed(
690634
|| executionState.equals(ExecutionState.RUNNING)
691635
|| executionState.equals(ExecutionState.CANCELING))) {
692636
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
693-
physicalVertex.updateTaskExecutionState(
637+
physicalVertex.updateStateByExecutionService(
694638
new TaskExecutionState(
695639
taskGroupLocation,
696640
ExecutionState.FAILED,
@@ -759,9 +703,6 @@ public void printJobDetailInfo() {
759703
case CREATED:
760704
createdJobCount.addAndGet(1);
761705
break;
762-
case SCHEDULED:
763-
scheduledJobCount.addAndGet(1);
764-
break;
765706
case RUNNING:
766707
runningJobCount.addAndGet(1);
767708
break;
@@ -771,7 +712,7 @@ public void printJobDetailInfo() {
771712
case FAILED:
772713
failedJobCount.addAndGet(1);
773714
break;
774-
case CANCELLING:
715+
case CANCELING:
775716
cancellingJobCount.addAndGet(1);
776717
break;
777718
case CANCELED:
@@ -780,15 +721,6 @@ public void printJobDetailInfo() {
780721
case FINISHED:
781722
finishedJobCount.addAndGet(1);
782723
break;
783-
case RESTARTING:
784-
restartingJobCount.addAndGet(1);
785-
break;
786-
case SUSPENDED:
787-
suspendedJobCount.addAndGet(1);
788-
break;
789-
case RECONCILING:
790-
reconcilingJobCount.addAndGet(1);
791-
break;
792724
default:
793725
}
794726
}

0 commit comments

Comments
 (0)