Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix the problem of unstable job status (#5450)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Oct 25, 2023
1 parent 5d4b319 commit 592ee00
Show file tree
Hide file tree
Showing 23 changed files with 763 additions and 1,236 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
dead-link:
name: Dead links
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- run: sudo npm install -g [email protected]
Expand Down Expand Up @@ -183,6 +183,7 @@ jobs:
- name: Make unit test modules
id: ut-modules
timeout-minutes: 60
if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '') }}
run: |
modules='${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}'
Expand Down Expand Up @@ -212,6 +213,7 @@ jobs:
- name: Make integration test modules
id: it-modules
timeout-minutes: 60
if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '' || steps.cv2-e2e-modules.outputs.modules != '' || steps.cv2-flink-e2e-modules.outputs.modules != '' || steps.cv2-spark-e2e-modules.outputs.modules != '' || steps.engine-e2e-modules.outputs.modules != '') }}
run: |
modules='${{ steps.cv2-e2e-modules.outputs.modules }}${{ steps.cv2-flink-e2e-modules.outputs.modules }}${{ steps.cv2-spark-e2e-modules.outputs.modules }}${{ steps.engine-e2e-modules.outputs.modules }}${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}'
Expand Down Expand Up @@ -520,7 +522,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand All @@ -544,7 +546,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 30
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
4 changes: 2 additions & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ appender.consoleStdout.name = consoleStdoutAppender
appender.consoleStdout.type = CONSOLE
appender.consoleStdout.target = SYSTEM_OUT
appender.consoleStdout.layout.type = PatternLayout
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
appender.consoleStdout.filter.acceptLtWarn.level = WARN
appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
Expand All @@ -52,7 +52,7 @@ appender.consoleStderr.name = consoleStderrAppender
appender.consoleStderr.type = CONSOLE
appender.consoleStderr.target = SYSTEM_ERR
appender.consoleStderr.layout.type = PatternLayout
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
appender.consoleStderr.filter.acceptGteWarn.level = WARN
appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
Expand All @@ -64,7 +64,7 @@ appender.file.fileName = ${file_path}/${file_name}.log
appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i
appender.file.append = true
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.file.policies.type = Policies
appender.file.policies.time.type = TimeBasedTriggeringPolicy
appender.file.policies.time.modulate = true
Expand Down
6 changes: 3 additions & 3 deletions config/log4j2_client.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ appender.consoleStdout.name = consoleStdoutAppender
appender.consoleStdout.type = CONSOLE
appender.consoleStdout.target = SYSTEM_OUT
appender.consoleStdout.layout.type = PatternLayout
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
appender.consoleStdout.filter.acceptLtWarn.level = WARN
appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
Expand All @@ -46,7 +46,7 @@ appender.consoleStderr.name = consoleStderrAppender
appender.consoleStderr.type = CONSOLE
appender.consoleStderr.target = SYSTEM_ERR
appender.consoleStderr.layout.type = PatternLayout
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
appender.consoleStderr.filter.acceptGteWarn.level = WARN
appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
Expand All @@ -58,7 +58,7 @@ appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
#appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i
#appender.file.append = true
#appender.file.layout.type = PatternLayout
#appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
#appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
#appender.file.policies.type = Policies
#appender.file.policies.time.type = TimeBasedTriggeringPolicy
#appender.file.policies.time.modulate = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc
.untilAsserted(
() -> {
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -134,7 +134,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
System.out.println(engineClient.getJobMetrics(clientJobProxy.getJobId()));
log.info("========================clean test resource====================");
log.warn("========================clean test resource====================");
} finally {
if (engineClient != null) {
engineClient.shutdown();
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedEx
.untilAsserted(
() -> {
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down Expand Up @@ -338,7 +338,7 @@ public void testBatchJobRestoreIn2NodeWorkerDown()
() -> {
// Wait some tasks commit finished
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down Expand Up @@ -436,7 +436,7 @@ public void testStreamJobRestoreIn2NodeWorkerDown()
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -458,9 +458,9 @@ public void testStreamJobRestoreIn2NodeWorkerDown()
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
log.warn(
FileUtils.getFileLineNumberFromDir(testResources.getLeft())
.toString());
Assertions.assertTrue(
JobStatus.RUNNING.equals(clientJobProxy.getJobStatus())
&& testRowNumber * testParallelism
Expand Down Expand Up @@ -553,7 +553,7 @@ public void testBatchJobRestoreIn2NodeMasterDown()
() -> {
// Wait some tasks commit finished
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -573,7 +573,7 @@ public void testBatchJobRestoreIn2NodeMasterDown()
.untilAsserted(
() -> {
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down Expand Up @@ -656,7 +656,7 @@ public void testStreamJobRestoreIn2NodeMasterDown()
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -677,7 +677,7 @@ public void testStreamJobRestoreIn2NodeMasterDown()
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down Expand Up @@ -823,7 +823,7 @@ public void testStreamJobRestoreInAllNodeDown()
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -840,15 +840,15 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();

log.info(
log.warn(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
log.warn(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Expand All @@ -859,7 +859,7 @@ public void testStreamJobRestoreInAllNodeDown()
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
log.warn(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
Expand All @@ -874,7 +874,7 @@ public void testStreamJobRestoreInAllNodeDown()
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand All @@ -895,7 +895,7 @@ public void testStreamJobRestoreInAllNodeDown()

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
log.warn(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Expand All @@ -913,7 +913,7 @@ public void testStreamJobRestoreInAllNodeDown()
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
log.warn(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
Expand Down Expand Up @@ -1041,7 +1041,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down Expand Up @@ -1092,7 +1092,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
log.warn(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
Expand Down
Loading

0 comments on commit 592ee00

Please sign in to comment.