Skip to content

Commit

Permalink
Clean up unnecessary status statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Oct 13, 2023
1 parent 08f847d commit 945cdbc
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 150 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
dead-link:
name: Dead links
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- run: sudo npm install -g [email protected]
Expand Down Expand Up @@ -183,6 +183,7 @@ jobs:
- name: Make unit test modules
id: ut-modules
timeout-minutes: 60
if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '') }}
run: |
modules='${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}'
Expand Down Expand Up @@ -212,6 +213,7 @@ jobs:
- name: Make integration test modules
id: it-modules
timeout-minutes: 60
if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '' || steps.cv2-e2e-modules.outputs.modules != '' || steps.cv2-flink-e2e-modules.outputs.modules != '' || steps.cv2-spark-e2e-modules.outputs.modules != '' || steps.engine-e2e-modules.outputs.modules != '') }}
run: |
modules='${{ steps.cv2-e2e-modules.outputs.modules }}${{ steps.cv2-flink-e2e-modules.outputs.modules }}${{ steps.cv2-spark-e2e-modules.outputs.modules }}${{ steps.engine-e2e-modules.outputs.modules }}${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}'
Expand Down Expand Up @@ -520,7 +522,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand All @@ -544,7 +546,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ spark-warehouse

seatunnel-examples
/lib/*
version.properties
version.properties
4 changes: 2 additions & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ appender.consoleStdout.name = consoleStdoutAppender
appender.consoleStdout.type = CONSOLE
appender.consoleStdout.target = SYSTEM_OUT
appender.consoleStdout.layout.type = PatternLayout
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
appender.consoleStdout.filter.acceptLtWarn.level = WARN
appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
Expand All @@ -52,7 +52,7 @@ appender.consoleStderr.name = consoleStderrAppender
appender.consoleStderr.type = CONSOLE
appender.consoleStderr.target = SYSTEM_ERR
appender.consoleStderr.layout.type = PatternLayout
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
appender.consoleStderr.filter.acceptGteWarn.level = WARN
appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
Expand All @@ -64,7 +64,7 @@ appender.file.fileName = ${file_path}/${file_name}.log
appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i
appender.file.append = true
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.file.policies.type = Policies
appender.file.policies.time.type = TimeBasedTriggeringPolicy
appender.file.policies.time.modulate = true
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- disable spotless check during release -->
<!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
Expand All @@ -746,6 +748,7 @@
</dependency>
</dependencies>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ source {
parallelism = ${fake_parallelism}
username = ${username}
password = ${password}
partition= ${partition111}
schema = {
fields {
name = "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,15 +840,15 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();

log.info(
System.out.println(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
System.out.println(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Expand All @@ -859,7 +859,7 @@ public void testStreamJobRestoreInAllNodeDown()
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
System.out.println(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
Expand Down Expand Up @@ -895,7 +895,7 @@ public void testStreamJobRestoreInAllNodeDown()

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
System.out.println(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Expand All @@ -913,7 +913,7 @@ public void testStreamJobRestoreInAllNodeDown()
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
System.out.println(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum JobStatus {
/** Job will scheduler every pipeline */
SCHEDULED(EndState.NOT_END),

/** Job is running and begine to scheduler pipeline running. */
/** The job is already running, and each pipeline is already running. */
RUNNING(EndState.NOT_END),

/** The job has failed and is currently waiting for the cleanup to complete. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
if (!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()) {
runningJobMasterMap.remove(jobId);
}
}
Expand Down Expand Up @@ -496,7 +496,11 @@ public JobStatus getJobStatus(long jobId) {
JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId);
return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus();
}
return runningJobMaster.getJobStatus();
JobStatus jobStatus = runningJobMaster.getJobStatus();
if (jobStatus == null) {
return jobHistoryService.getFinishedJobStateImap().get(jobId).getJobStatus();
}
return jobStatus;
}

public JobMetrics getJobMetrics(long jobId) {
Expand Down Expand Up @@ -687,9 +691,6 @@ public void printJobDetailInfo() {
AtomicLong cancellingJobCount = new AtomicLong();
AtomicLong canceledJobCount = new AtomicLong();
AtomicLong finishedJobCount = new AtomicLong();
AtomicLong restartingJobCount = new AtomicLong();
AtomicLong suspendedJobCount = new AtomicLong();
AtomicLong reconcilingJobCount = new AtomicLong();

if (runningJobInfoIMap != null) {
runningJobInfoIMap
Expand All @@ -703,6 +704,9 @@ public void printJobDetailInfo() {
case CREATED:
createdJobCount.addAndGet(1);
break;
case SCHEDULED:
scheduledJobCount.addAndGet(1);
break;
case RUNNING:
runningJobCount.addAndGet(1);
break;
Expand Down Expand Up @@ -745,12 +749,6 @@ public void printJobDetailInfo() {
"canceledJobCount",
canceledJobCount,
"finishedJobCount",
finishedJobCount,
"restartingJobCount",
restartingJobCount,
"suspendedJobCount",
suspendedJobCount,
"reconcilingJobCount",
reconcilingJobCount));
finishedJobCount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public void addPipelineEndCallback(SubPlan subPlan) {
future.thenAcceptAsync(
pipelineState -> {
try {
log.info(
"{} future complete with state {}",
subPlan.getPipelineFullName(),
pipelineState.getPipelineStatus());
if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
canceledPipelineNum.incrementAndGet();
} else if (PipelineStatus.FAILED.equals(
Expand All @@ -149,7 +153,7 @@ public void addPipelineEndCallback(SubPlan subPlan) {
String.format(
"cancel job %s because makeJobEndWhenPipelineEnded is true",
jobFullName));
updateJobState(getJobStatus(), JobStatus.FAILING);
updateJobState(JobStatus.FAILING);
}
}

Expand Down Expand Up @@ -184,7 +188,7 @@ public void cancelJob() {
return;
}

updateJobState(getJobStatus(), JobStatus.CANCELING);
updateJobState(JobStatus.CANCELING);
}

public List<SubPlan> getPipelineList() {
Expand All @@ -199,15 +203,9 @@ private void updateStateTimestamps(@NonNull JobStatus targetState) {
runningJobStateTimestampsIMap.set(jobId, stateTimestamps);
}

public void updateJobState(@NonNull JobStatus targetState) {
synchronized (this) {
updateJobState((JobStatus) runningJobStateIMap.get(jobId), targetState);
}
}

public synchronized void updateJobState(
@NonNull JobStatus current, @NonNull JobStatus targetState) {
public synchronized void updateJobState(@NonNull JobStatus targetState) {
try {
JobStatus current = (JobStatus) runningJobStateIMap.get(jobId);
log.debug(
String.format(
"Try to update the %s state from %s to %s",
Expand All @@ -225,23 +223,6 @@ public synchronized void updateJobState(
throw new SeaTunnelEngineException(message);
}

JobStatus stateInMap = (JobStatus) runningJobStateIMap.get(jobId);
if (!current.equals(stateInMap)) {
if (JobStatus.FAILING.equals(stateInMap)
|| JobStatus.CANCELING.equals(stateInMap)) {
log.debug(
String.format(
"%s state is %s, can not be turn to %s.",
jobFullName, targetState));
return;
} else {
throw new SeaTunnelEngineException(
String.format(
"%s have error state: %s, Never come here.",
jobFullName, stateInMap));
}
}

// now do the actual state transition
// we must update runningJobStateTimestampsIMap first and then can update
// runningJobStateIMap
Expand Down Expand Up @@ -289,28 +270,34 @@ public void makeJobFailing(Throwable e) {

public void startJob() {
isRunning = true;
log.info("{} state process is start", getJobFullName());
stateProcess();
}

public void stopJobStateProcess() {
isRunning = false;
log.info("{} state process is stop", getJobFullName());
}

private synchronized void stateProcess() {
if (!isRunning) {
log.warn(String.format("%s state process is stopped", jobFullName));
return;
}
switch (getJobStatus()) {
case CREATED:
updateJobState(JobStatus.CREATED, JobStatus.SCHEDULED);
updateJobState(JobStatus.SCHEDULED);
break;
case SCHEDULED:
getPipelineList()
.forEach(
subPlan -> {
if (PipelineStatus.CREATED.equals(
subPlan.getCurrPipelineStatus())) {
subPlan.startSubPlan();
subPlan.startSubPlanStateProcess();
}
});
updateJobState(JobStatus.SCHEDULED, JobStatus.RUNNING);
updateJobState(JobStatus.RUNNING);
break;
case RUNNING:
try {
Expand All @@ -327,7 +314,7 @@ private synchronized void stateProcess() {
case FAILED:
case CANCELED:
case FINISHED:
isRunning = false;
stopJobStateProcess();
jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));
return;
default:
Expand Down
Loading

0 comments on commit 945cdbc

Please sign in to comment.