From 592ee005875c0acd000c061ee23cf615578794b2 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 25 Oct 2023 16:23:26 +0800 Subject: [PATCH] [Hotfix][Zeta] Fix the problem of unstable job status (#5450) --- .github/workflows/backend.yml | 8 +- .idea/vcs.xml | 4 +- config/log4j2.properties | 6 +- config/log4j2_client.properties | 6 +- .../engine/e2e/ClusterFaultToleranceIT.java | 42 +- .../ClusterFaultToleranceTwoPipelineIT.java | 20 +- .../seatunnel/engine/e2e/RestApiIT.java | 12 +- .../seatunnel/engine/client/TestUtils.java | 4 +- .../exception/TaskGroupDeployException.java} | 27 +- .../seatunnel/engine/core/job/JobStatus.java | 45 +- .../engine/core/job/PipelineStatus.java | 4 +- .../engine/server/CoordinatorService.java | 136 ++--- .../server/dag/physical/PhysicalPlan.java | 223 ++++----- .../server/dag/physical/PhysicalVertex.java | 359 +++++++------ .../server/dag/physical/ResourceUtils.java | 66 +++ .../engine/server/dag/physical/SubPlan.java | 470 +++++++++--------- .../server/execution/ExecutionState.java | 6 +- .../server/master/JobHistoryService.java | 3 +- .../engine/server/master/JobMaster.java | 113 ++--- .../scheduler/PipelineBaseScheduler.java | 401 --------------- .../engine/server/CoordinatorServiceTest.java | 27 +- .../checkpoint/CheckpointTimeOutTest.java | 13 +- .../src/test/resources/log4j2-test.properties | 4 +- 23 files changed, 763 insertions(+), 1236 deletions(-) rename seatunnel-engine/{seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java => seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/TaskGroupDeployException.java} (51%) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java delete mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 2e590dd0a4b..5338e252e7e 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -50,7 +50,7 @@ jobs: dead-link: name: Dead links runs-on: ubuntu-latest - timeout-minutes: 30 + timeout-minutes: 90 steps: - uses: actions/checkout@v2 - run: sudo npm install -g markdown-link-check@3.8.7 @@ -183,6 +183,7 @@ jobs: - name: Make unit test modules id: ut-modules + timeout-minutes: 60 if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '') }} run: | modules='${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}' @@ -212,6 +213,7 @@ jobs: - name: Make integration test modules id: it-modules + timeout-minutes: 60 if: ${{ steps.filter.outputs.api == 'false' && (steps.engine-modules.outputs.modules != '' || steps.cv2-modules.outputs.modules != '' || steps.cv2-e2e-modules.outputs.modules != '' || steps.cv2-flink-e2e-modules.outputs.modules != '' || steps.cv2-spark-e2e-modules.outputs.modules != '' || steps.engine-e2e-modules.outputs.modules != '') }} run: | modules='${{ steps.cv2-e2e-modules.outputs.modules }}${{ steps.cv2-flink-e2e-modules.outputs.modules }}${{ steps.cv2-spark-e2e-modules.outputs.modules }}${{ steps.engine-e2e-modules.outputs.modules }}${{ steps.engine-modules.outputs.modules }}${{ steps.cv2-modules.outputs.modules }}' @@ -520,7 +522,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 30 + timeout-minutes: 90 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} @@ -544,7 +546,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 30 + timeout-minutes: 90 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 81f2456ebc3..73b27813183 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -6,9 +6,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/config/log4j2.properties b/config/log4j2.properties index fb1a07c6d01..ca49814892c 100644 --- a/config/log4j2.properties +++ b/config/log4j2.properties @@ -42,7 +42,7 @@ appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE appender.consoleStdout.target = SYSTEM_OUT appender.consoleStdout.layout.type = PatternLayout -appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter appender.consoleStdout.filter.acceptLtWarn.level = WARN appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY @@ -52,7 +52,7 @@ appender.consoleStderr.name = consoleStderrAppender appender.consoleStderr.type = CONSOLE appender.consoleStderr.target = SYSTEM_ERR appender.consoleStderr.layout.type = PatternLayout -appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter appender.consoleStderr.filter.acceptGteWarn.level = WARN appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT @@ -64,7 +64,7 @@ appender.file.fileName = ${file_path}/${file_name}.log appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i appender.file.append = true appender.file.layout.type = PatternLayout -appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.file.policies.type = Policies appender.file.policies.time.type = TimeBasedTriggeringPolicy appender.file.policies.time.modulate = true diff --git a/config/log4j2_client.properties b/config/log4j2_client.properties index 10185592e19..9b491c24e82 100644 --- a/config/log4j2_client.properties +++ b/config/log4j2_client.properties @@ -36,7 +36,7 @@ appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE appender.consoleStdout.target = SYSTEM_OUT appender.consoleStdout.layout.type = PatternLayout -appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter appender.consoleStdout.filter.acceptLtWarn.level = WARN appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY @@ -46,7 +46,7 @@ appender.consoleStderr.name = consoleStderrAppender appender.consoleStderr.type = CONSOLE appender.consoleStderr.target = SYSTEM_ERR appender.consoleStderr.layout.type = PatternLayout -appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter appender.consoleStderr.filter.acceptGteWarn.level = WARN appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT @@ -58,7 +58,7 @@ appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY #appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i #appender.file.append = true #appender.file.layout.type = PatternLayout -#appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +#appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n #appender.file.policies.type = Policies #appender.file.policies.time.type = TimeBasedTriggeringPolicy #appender.file.policies.time.modulate = true diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index 86dc52a92b4..0aa83151316 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -119,7 +119,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc .untilAsserted( () -> { Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -134,7 +134,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc FileUtils.getFileLineNumberFromDir(testResources.getLeft()); Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir); System.out.println(engineClient.getJobMetrics(clientJobProxy.getJobId())); - log.info("========================clean test resource===================="); + log.warn("========================clean test resource===================="); } finally { if (engineClient != null) { engineClient.shutdown(); @@ -242,7 +242,7 @@ public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedEx .untilAsserted( () -> { Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -338,7 +338,7 @@ public void testBatchJobRestoreIn2NodeWorkerDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -436,7 +436,7 @@ public void testStreamJobRestoreIn2NodeWorkerDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -458,9 +458,9 @@ public void testStreamJobRestoreIn2NodeWorkerDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( - FileUtils.getFileLineNumberFromDir( - testResources.getLeft())); + log.warn( + FileUtils.getFileLineNumberFromDir(testResources.getLeft()) + .toString()); Assertions.assertTrue( JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) && testRowNumber * testParallelism @@ -553,7 +553,7 @@ public void testBatchJobRestoreIn2NodeMasterDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -573,7 +573,7 @@ public void testBatchJobRestoreIn2NodeMasterDown() .untilAsserted( () -> { Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -656,7 +656,7 @@ public void testStreamJobRestoreIn2NodeMasterDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -677,7 +677,7 @@ public void testStreamJobRestoreIn2NodeMasterDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -823,7 +823,7 @@ public void testStreamJobRestoreInAllNodeDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -840,7 +840,7 @@ public void testStreamJobRestoreInAllNodeDown() node1.shutdown(); node2.shutdown(); - log.info( + log.warn( "==========================================All node is done========================================"); Thread.sleep(10000); @@ -848,7 +848,7 @@ public void testStreamJobRestoreInAllNodeDown() node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); - log.info( + log.warn( "==========================================All node is start, begin check node size ========================================"); // waiting all node added to cluster HazelcastInstanceImpl restoreFinalNode = node1; @@ -859,7 +859,7 @@ public void testStreamJobRestoreInAllNodeDown() Assertions.assertEquals( 2, restoreFinalNode.getCluster().getMembers().size())); - log.info( + log.warn( "==========================================All node is running========================================"); engineClient = new SeaTunnelClient(clientConfig); ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId); @@ -874,7 +874,7 @@ public void testStreamJobRestoreInAllNodeDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -895,7 +895,7 @@ public void testStreamJobRestoreInAllNodeDown() // sleep 10s and expect the job don't write more rows. Thread.sleep(10000); - log.info( + log.warn( "==========================================Cancel Job========================================"); newClientJobProxy.cancelJob(); @@ -913,7 +913,7 @@ public void testStreamJobRestoreInAllNodeDown() Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir); } finally { - log.info( + log.warn( "==========================================Clean test resource ========================================"); if (engineClient != null) { engineClient.shutdown(); @@ -1041,7 +1041,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -1092,7 +1092,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java index 44087645ba1..28b13ea6484 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java @@ -127,7 +127,7 @@ public void testTwoPipelineBatchJobRunOkIn2Node() .untilAsserted( () -> { Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -255,7 +255,7 @@ public void testTwoPipelineStreamJobRunOkIn2Node() .untilAsserted( () -> { Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -354,7 +354,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -375,7 +375,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -473,7 +473,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -495,7 +495,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -599,7 +599,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -620,7 +620,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() () -> { // Wait some tasks commit finished Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -710,7 +710,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() // Wait some tasks commit finished, and we can get rows from the // sink target dir Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) @@ -731,7 +731,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() () -> { // Wait job write all rows in file Thread.sleep(2000); - System.out.println( + log.warn( "\n=================================" + FileUtils.getFileLineNumberFromDir( testResources.getLeft()) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 305ca7b3b27..fec5dc65f22 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -144,9 +144,15 @@ public void testSubmitJob() { .getNodeExtension() .createExtensionServices() .get(Constant.SEATUNNEL_SERVICE_NAME); - JobStatus jobStatus = - seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); - Assertions.assertEquals(JobStatus.RUNNING, jobStatus); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.RUNNING, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); Awaitility.await() .atMost(2, TimeUnit.MINUTES) .untilAsserted( diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java index 21964799cb1..cb1270623d4 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java @@ -43,7 +43,7 @@ public void testContentFormatUtil() throws InterruptedException { new JobStatusData( 4352352414135L + i, "Testfdsafew" + i, - JobStatus.CANCELLING, + JobStatus.CANCELING, System.currentTimeMillis(), System.currentTimeMillis())); Thread.sleep(2L); @@ -53,7 +53,7 @@ public void testContentFormatUtil() throws InterruptedException { new JobStatusData( 4352352414135L + i, "fdsafsddfasfsdafasdf" + i, - JobStatus.RECONCILING, + JobStatus.UNKNOWABLE, System.currentTimeMillis(), null)); Thread.sleep(2L); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/TaskGroupDeployException.java similarity index 51% rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/TaskGroupDeployException.java index dbfcc235ef3..cd655eec3e0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/TaskGroupDeployException.java @@ -15,16 +15,29 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.scheduler; +package org.apache.seatunnel.engine.common.exception; -import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import com.hazelcast.client.impl.protocol.ClientExceptionFactory; +import com.hazelcast.core.HazelcastException; -import lombok.NonNull; +public class TaskGroupDeployException extends HazelcastException + implements ClientExceptionFactory.ExceptionFactory { + public TaskGroupDeployException() {} -import java.util.concurrent.CompletableFuture; + public TaskGroupDeployException(String message) { + super(message); + } -public interface JobScheduler { - CompletableFuture reSchedulerPipeline(@NonNull SubPlan subPlan); + public TaskGroupDeployException(String message, Throwable cause) { + super(message, cause); + } - void startScheduling(); + public TaskGroupDeployException(Throwable cause) { + super(cause); + } + + @Override + public Throwable createException(String s, Throwable throwable) { + return new TaskGroupDeployException(s, throwable); + } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java index 7c50744dba0..e9801629b10 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java @@ -29,10 +29,13 @@ public enum JobStatus { /** Job is newly created, no task has started to run. */ CREATED(EndState.NOT_END), - /** Job is begin schedule but some task not deploy complete. */ + /** + * Job will scheduler every pipeline, each PhysicalVertex in the pipeline will be scheduler and + * deploying + */ SCHEDULED(EndState.NOT_END), - /** Some tasks are scheduled or running, some may be pending, some may be finished. */ + /** The job is already running, and each pipeline is already running. */ RUNNING(EndState.NOT_END), /** The job has failed and is currently waiting for the cleanup to complete. */ @@ -42,7 +45,7 @@ public enum JobStatus { FAILED(EndState.GLOBALLY), /** Job is being cancelled. */ - CANCELLING(EndState.NOT_END), + CANCELING(EndState.NOT_END), /** Job has been cancelled. */ CANCELED(EndState.GLOBALLY), @@ -50,18 +53,6 @@ public enum JobStatus { /** All of the job's tasks have successfully finished. */ FINISHED(EndState.GLOBALLY), - /** The job is currently undergoing a reset and total restart. */ - RESTARTING(EndState.NOT_END), - - /** - * The job has been suspended which means that it has been stopped but not been removed from a - * potential HA job store. - */ - SUSPENDED(EndState.LOCALLY), - - /** The job is currently reconciling and waits for task execution report to recover state. */ - RECONCILING(EndState.NOT_END), - /** Cannot find the JobID or the job status has already been cleared. */ UNKNOWABLE(EndState.GLOBALLY); @@ -79,30 +70,6 @@ private enum EndState { this.endState = endState; } - /** - * Checks whether this state is globally terminal. A globally terminal job is complete - * and cannot fail any more and will not be restarted or recovered by another standby master - * node. - * - *

When a globally terminal state has been reached, all recovery data for the job is dropped - * from the high-availability services. - * - * @return True, if this job status is globally terminal, false otherwise. - */ - public boolean isGloballyEndState() { - return endState == EndState.GLOBALLY; - } - - /** - * Checks whether this state is locally terminal. Locally terminal refers to the state of - * a job's execution graph within an executing JobManager. If the execution graph is locally - * terminal, the JobManager will not continue executing or recovering the job. - * - *

The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED}, - * which is typically entered when the executing JobManager loses its leader status. - * - * @return True, if this job status is terminal, false otherwise. - */ public boolean isEndState() { return endState != EndState.NOT_END; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineStatus.java index 46128814db8..92d00e43bcd 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineStatus.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineStatus.java @@ -66,9 +66,9 @@ public enum PipelineStatus { CANCELED, - FAILED, + FAILING, - RECONCILING, + FAILED, /** Restoring last possible valid state of the pipeline if it has it. */ INITIALIZING; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 89a2258ce2d..fc244e696d3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -275,92 +275,37 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI metricsImap, engineConfig); - // If Job Status is CANCELLING , set needRestore to false try { - jobMaster.init( - runningJobInfoIMap.get(jobId).getInitializationTimestamp(), - true, - !JobStatus.CANCELLING.equals(jobStatus)); + jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), true); } catch (Exception e) { throw new SeaTunnelEngineException(String.format("Job id %s init failed", jobId), e); } String jobFullName = jobMaster.getPhysicalPlan().getJobFullName(); - if (jobStatus.isEndState()) { - logger.info( - String.format( - "The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info", - jobFullName, jobStatus)); - jobMaster.cleanJob(); - return; - } - - if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) { - CompletableFuture.runAsync( - () -> { - logger.info( - String.format( - "The restore %s is state %s, cancel job and submit it again.", - jobFullName, jobStatus)); - jobMaster.cancelJob(); - jobMaster.getJobMasterCompleteFuture().join(); - submitJob(jobId, jobInfo.getJobImmutableInformation()).join(); - }, - executorService); - - return; - } - runningJobMasterMap.put(jobId, jobMaster); - jobMaster.markRestore(); - if (JobStatus.CANCELLING.equals(jobStatus)) { - logger.info( - String.format( - "The restore %s is in %s state, cancel the job", - jobFullName, jobStatus)); - CompletableFuture.runAsync( - () -> { - try { - jobMaster.cancelJob(); - jobMaster.run(); - } finally { - // voidCompletableFuture will be cancelled when zeta master node - // shutdown to simulate master failure, - // don't update runningJobMasterMap is this case. - if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) { - runningJobMasterMap.remove(jobId); - } - } - }, - executorService); - return; - } - - if (JobStatus.RUNNING.equals(jobStatus)) { - logger.info( - String.format( - "The restore %s is in %s state, restore pipeline and take over this job running", - jobFullName, jobStatus)); - CompletableFuture.runAsync( - () -> { - try { - jobMaster - .getPhysicalPlan() - .getPipelineList() - .forEach(SubPlan::restorePipelineState); - jobMaster.run(); - } finally { - // voidCompletableFuture will be cancelled when zeta master node - // shutdown to simulate master failure, - // don't update runningJobMasterMap is this case. - if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) { - runningJobMasterMap.remove(jobId); - } + logger.info( + String.format( + "The restore %s is in %s state, restore pipeline and take over this job running", + jobFullName, jobStatus)); + CompletableFuture.runAsync( + () -> { + try { + jobMaster + .getPhysicalPlan() + .getPipelineList() + .forEach(SubPlan::restorePipelineState); + jobMaster.run(); + } finally { + // voidCompletableFuture will be cancelled when zeta master node + // shutdown to simulate master failure, + // don't update runningJobMasterMap is this case. + if (!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()) { + runningJobMasterMap.remove(jobId); } - }, - executorService); - } + } + }, + executorService); } private void checkNewActiveMaster() { @@ -390,10 +335,11 @@ private void checkNewActiveMaster() { } } - private void clearCoordinatorService() { + public synchronized void clearCoordinatorService() { // interrupt all JobMaster runningJobMasterMap.values().forEach(JobMaster::interrupt); executorService.shutdownNow(); + runningJobMasterMap.clear(); try { executorService.awaitTermination(20, TimeUnit.SECONDS); @@ -457,9 +403,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf new JobInfo(System.currentTimeMillis(), jobImmutableInformation)); runningJobMasterMap.put(jobId, jobMaster); jobMaster.init( - runningJobInfoIMap.get(jobId).getInitializationTimestamp(), - false, - true); + runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false); // We specify that when init is complete, the submitJob is complete jobSubmitFuture.complete(null); } catch (Throwable e) { @@ -552,7 +496,11 @@ public JobStatus getJobStatus(long jobId) { JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId); return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus(); } - return runningJobMaster.getJobStatus(); + JobStatus jobStatus = runningJobMaster.getJobStatus(); + if (jobStatus == null) { + return jobHistoryService.getFinishedJobStateImap().get(jobId).getJobStatus(); + } + return jobStatus; } public JobMetrics getJobMetrics(long jobId) { @@ -690,7 +638,7 @@ private void makeTasksFailed( || executionState.equals(ExecutionState.RUNNING) || executionState.equals(ExecutionState.CANCELING))) { TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation(); - physicalVertex.updateTaskExecutionState( + physicalVertex.updateStateByExecutionService( new TaskExecutionState( taskGroupLocation, ExecutionState.FAILED, @@ -743,9 +691,6 @@ public void printJobDetailInfo() { AtomicLong cancellingJobCount = new AtomicLong(); AtomicLong canceledJobCount = new AtomicLong(); AtomicLong finishedJobCount = new AtomicLong(); - AtomicLong restartingJobCount = new AtomicLong(); - AtomicLong suspendedJobCount = new AtomicLong(); - AtomicLong reconcilingJobCount = new AtomicLong(); if (runningJobInfoIMap != null) { runningJobInfoIMap @@ -771,7 +716,7 @@ public void printJobDetailInfo() { case FAILED: failedJobCount.addAndGet(1); break; - case CANCELLING: + case CANCELING: cancellingJobCount.addAndGet(1); break; case CANCELED: @@ -780,15 +725,6 @@ public void printJobDetailInfo() { case FINISHED: finishedJobCount.addAndGet(1); break; - case RESTARTING: - restartingJobCount.addAndGet(1); - break; - case SUSPENDED: - suspendedJobCount.addAndGet(1); - break; - case RECONCILING: - reconcilingJobCount.addAndGet(1); - break; default: } } @@ -813,12 +749,6 @@ public void printJobDetailInfo() { "canceledJobCount", canceledJobCount, "finishedJobCount", - finishedJobCount, - "restartingJobCount", - restartingJobCount, - "suspendedJobCount", - suspendedJobCount, - "reconcilingJobCount", - reconcilingJobCount)); + finishedJobCount)); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java index ed3ec5c8c86..72de3f8658a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java @@ -18,6 +18,10 @@ package org.apache.seatunnel.engine.server.dag.physical; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.JobResult; @@ -26,22 +30,19 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus; import org.apache.seatunnel.engine.server.master.JobMaster; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import com.hazelcast.map.IMap; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; +@Slf4j public class PhysicalPlan { - private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class); - private final List pipelineList; private AtomicInteger finishedPipelineNum = new AtomicInteger(0); @@ -62,11 +63,7 @@ public class PhysicalPlan { */ private final IMap runningJobStateTimestampsIMap; - /** - * when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan - * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will - * be called. - */ + /** when job status turn to end, complete this future. */ private CompletableFuture jobEndFuture; /** The error throw by subPlan, should be set when subPlan throw error. */ @@ -81,6 +78,8 @@ public class PhysicalPlan { /** Whether we make the job end when pipeline turn to end state. */ private boolean makeJobEndWhenPipelineEnded = true; + private volatile boolean isRunning = false; + public PhysicalPlan( @NonNull List pipelineList, @NonNull ExecutorService executorService, @@ -139,126 +138,63 @@ public void addPipelineEndCallback(SubPlan subPlan) { future.thenAcceptAsync( pipelineState -> { try { + log.info( + "{} future complete with state {}", + subPlan.getPipelineFullName(), + pipelineState.getPipelineStatus()); if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) { canceledPipelineNum.incrementAndGet(); - if (makeJobEndWhenPipelineEnded) { - LOGGER.info( - String.format( - "cancel job %s because makeJobEndWhenPipelineEnded is true", - jobFullName)); - cancelJob(); - } } else if (PipelineStatus.FAILED.equals( pipelineState.getPipelineStatus())) { failedPipelineNum.incrementAndGet(); errorBySubPlan.compareAndSet(null, pipelineState.getThrowableMsg()); if (makeJobEndWhenPipelineEnded) { - LOGGER.info( + log.info( String.format( "cancel job %s because makeJobEndWhenPipelineEnded is true", jobFullName)); - cancelJob(); + updateJobState(JobStatus.FAILING); } } if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) { JobStatus jobStatus; if (failedPipelineNum.get() > 0) { - jobStatus = JobStatus.FAILING; + jobStatus = JobStatus.FAILED; updateJobState(jobStatus); } else if (canceledPipelineNum.get() > 0) { jobStatus = JobStatus.CANCELED; - turnToEndState(jobStatus); + updateJobState(jobStatus); } else { jobStatus = JobStatus.FINISHED; - turnToEndState(jobStatus); + updateJobState(jobStatus); } - jobEndFuture.complete(new JobResult(jobStatus, errorBySubPlan.get())); } } catch (Throwable e) { // Because only cancelJob or releasePipelineResource can throw exception, so // we only output log here - LOGGER.severe(ExceptionUtils.getMessage(e)); + log.error(ExceptionUtils.getMessage(e)); } }, jobMaster.getExecutorService()); } public void cancelJob() { - jobMaster.neverNeedRestore(); if (getJobStatus().isEndState()) { - LOGGER.warning( + log.warn( String.format( "%s is in end state %s, can not be cancel", jobFullName, getJobStatus())); return; } - // If an active Master Node done and another Master Node active, we can not know whether - // cancelRunningJob - // complete. So we need cancelRunningJob again. - if (JobStatus.CANCELLING.equals(getJobStatus())) { - cancelJobPipelines(); - return; - } - updateJobState((JobStatus) runningJobStateIMap.get(jobId), JobStatus.CANCELLING); - cancelJobPipelines(); - } - - private void cancelJobPipelines() { - List> collect = - pipelineList.stream() - .map( - pipeline -> - CompletableFuture.runAsync( - pipeline::cancelPipeline, - jobMaster.getExecutorService())) - .collect(Collectors.toList()); - - try { - CompletableFuture voidCompletableFuture = - CompletableFuture.allOf(collect.toArray(new CompletableFuture[0])); - voidCompletableFuture.join(); - } catch (Exception e) { - LOGGER.severe( - String.format( - "%s cancel error with exception: %s", - jobFullName, ExceptionUtils.getMessage(e))); - } + updateJobState(JobStatus.CANCELING); } public List getPipelineList() { return pipelineList; } - private void turnToEndState(@NonNull JobStatus endState) { - synchronized (this) { - // consistency check - JobStatus current = (JobStatus) runningJobStateIMap.get(jobId); - if (current.isEndState()) { - String message = "Job is trying to leave terminal state " + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - if (!endState.isEndState()) { - String message = "Need a end state, not " + endState; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - // notify checkpoint manager - jobMaster.getCheckpointManager().shutdown(endState); - - LOGGER.info(String.format("%s end with state %s", getJobFullName(), endState)); - // we must update runningJobStateTimestampsIMap first and then can update - // runningJobStateIMap - updateStateTimestamps(endState); - - runningJobStateIMap.put(jobId, endState); - } - } - private void updateStateTimestamps(@NonNull JobStatus targetState) { // we must update runningJobStateTimestampsIMap first and then can update // runningJobStateIMap @@ -267,39 +203,50 @@ private void updateStateTimestamps(@NonNull JobStatus targetState) { runningJobStateTimestampsIMap.set(jobId, stateTimestamps); } - public boolean updateJobState(@NonNull JobStatus targetState) { - synchronized (this) { - return updateJobState((JobStatus) runningJobStateIMap.get(jobId), targetState); - } - } + public synchronized void updateJobState(@NonNull JobStatus targetState) { + try { + JobStatus current = (JobStatus) runningJobStateIMap.get(jobId); + log.debug( + String.format( + "Try to update the %s state from %s to %s", + jobFullName, current, targetState)); + + if (current.equals(targetState)) { + log.info( + "{} current state equals target state: {}, skip", jobFullName, targetState); + return; + } - public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) { - synchronized (this) { // consistency check if (current.isEndState()) { String message = "Job is trying to leave terminal state " + current; - LOGGER.severe(message); - throw new IllegalStateException(message); + throw new SeaTunnelEngineException(message); } // now do the actual state transition - if (current.equals(runningJobStateIMap.get(jobId))) { - LOGGER.info( - String.format( - "Job %s (%s) turn from state %s to %s.", - jobImmutableInformation.getJobConfig().getName(), - jobId, - current, - targetState)); - - // we must update runningJobStateTimestampsIMap first and then can update - // runningJobStateIMap - updateStateTimestamps(targetState); - - runningJobStateIMap.set(jobId, targetState); - return true; - } else { - return false; + // we must update runningJobStateTimestampsIMap first and then can update + // runningJobStateIMap + // we must update runningJobStateTimestampsIMap first and then can update + // runningJobStateIMap + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(targetState); + runningJobStateIMap.set(jobId, targetState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + log.info( + String.format( + "%s turned from state %s to %s.", jobFullName, current, targetState)); + stateProcess(); + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + if (!targetState.equals(JobStatus.FAILING)) { + makeJobFailing(e); } } } @@ -315,4 +262,58 @@ public JobStatus getJobStatus() { public String getJobFullName() { return jobFullName; } + + public void makeJobFailing(Throwable e) { + errorBySubPlan.compareAndSet(null, ExceptionUtils.getMessage(e)); + updateJobState(JobStatus.FAILING); + } + + public void startJob() { + isRunning = true; + log.info("{} state process is start", getJobFullName()); + stateProcess(); + } + + public void stopJobStateProcess() { + isRunning = false; + log.info("{} state process is stop", getJobFullName()); + } + + private synchronized void stateProcess() { + if (!isRunning) { + log.warn(String.format("%s state process is stopped", jobFullName)); + return; + } + switch (getJobStatus()) { + case CREATED: + updateJobState(JobStatus.SCHEDULED); + break; + case SCHEDULED: + getPipelineList() + .forEach( + subPlan -> { + if (PipelineStatus.CREATED.equals( + subPlan.getCurrPipelineStatus())) { + subPlan.startSubPlanStateProcess(); + } + }); + updateJobState(JobStatus.RUNNING); + break; + case RUNNING: + break; + case FAILING: + case CANCELING: + jobMaster.neverNeedRestore(); + getPipelineList().forEach(SubPlan::cancelPipeline); + break; + case FAILED: + case CANCELED: + case FINISHED: + stopJobStateProcess(); + jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get())); + return; + default: + throw new IllegalArgumentException("Unknown Job State: " + getJobStatus()); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index 85cc31850bf..e690500785b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.exception.TaskGroupDeployException; import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -38,17 +40,14 @@ import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; -import org.apache.commons.lang3.StringUtils; - import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; import com.hazelcast.flakeidgen.FlakeIdGenerator; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import java.net.URL; import java.util.List; @@ -57,6 +56,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -66,10 +66,9 @@ * PhysicalVertex. And the number of PhysicalVertex equals the {@link * ExecutionVertex#getParallelism()}. */ +@Slf4j public class PhysicalVertex { - private static final ILogger LOGGER = Logger.getLogger(PhysicalVertex.class); - private final TaskGroupLocation taskGroupLocation; private final String taskFullName; @@ -104,6 +103,11 @@ public class PhysicalVertex { private volatile ExecutionState currExecutionState = ExecutionState.CREATED; + public volatile boolean isRunning = false; + + /** The error throw by physicalVertex, should be set when physicalVertex throw error. */ + private AtomicReference errorByPhysicalVertex = new AtomicReference<>(); + public PhysicalVertex( int subTaskGroupIndex, @NonNull ExecutorService executorService, @@ -142,7 +146,7 @@ public PhysicalVertex( this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); this.nodeEngine = nodeEngine; - if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) { + if (log.isDebugEnabled() || log.isTraceEnabled()) { this.taskFullName = String.format( "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]", @@ -177,7 +181,7 @@ public PassiveCompletableFuture initStateFuture() { this.taskFuture = new CompletableFuture<>(); this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); if (currExecutionState != null) { - LOGGER.info( + log.info( String.format( "The task %s is in state %s when init state future", taskFullName, currExecutionState)); @@ -188,20 +192,21 @@ public PassiveCompletableFuture initStateFuture() { // exists. if (ExecutionState.RUNNING.equals(currExecutionState)) { if (!checkTaskGroupIsExecuting(taskGroupLocation)) { - updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED); - this.taskFuture.complete( - new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED)); + updateTaskState(ExecutionState.FAILING); + } + } else if (ExecutionState.DEPLOYING.equals(currExecutionState)) { + if (!checkTaskGroupIsExecuting(taskGroupLocation)) { + updateTaskState(ExecutionState.RUNNING); } - } - // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel(). - else if (ExecutionState.CANCELING.equals(currExecutionState)) { - noticeTaskExecutionServiceCancel(); - } else if (currExecutionState.isEndState()) { - this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, currExecutionState)); } return new PassiveCompletableFuture<>(this.taskFuture); } + public void restoreExecutionState() { + startPhysicalVertex(); + stateProcess(); + } + private boolean checkTaskGroupIsExecuting(TaskGroupLocation taskGroupLocation) { IMap> ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); @@ -214,7 +219,7 @@ private boolean checkTaskGroupIsExecuting(TaskGroupLocation taskGroupLocation) { .map(Member::getAddress) .collect(Collectors.toList()); if (!members.contains(worker)) { - LOGGER.warning( + log.warn( "The node:" + worker.toString() + " running the taskGroup " @@ -233,7 +238,7 @@ private boolean checkTaskGroupIsExecuting(TaskGroupLocation taskGroupLocation) { try { return (Boolean) invoke.get(); } catch (InterruptedException | ExecutionException e) { - LOGGER.warning( + log.warn( "Execution of CheckTaskGroupIsExecutingOperation " + taskGroupLocation + " failed, checkTaskGroupIsExecuting return false. ", @@ -282,8 +287,8 @@ private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) { .get(); } catch (Exception e) { if (getExecutionState().isEndState()) { - LOGGER.warning(ExceptionUtils.getMessage(e)); - LOGGER.warning( + log.warn(ExceptionUtils.getMessage(e)); + log.warn( String.format( "%s deploy error, but the state is already in end state %s, skip this error", getTaskFullName(), currExecutionState)); @@ -295,6 +300,10 @@ private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) { }); } + public void makeTaskGroupDeploy() { + updateTaskState(ExecutionState.DEPLOYING); + } + // This method must not throw an exception public TaskDeployState deploy(@NonNull SlotProfile slotProfile) { try { @@ -304,7 +313,6 @@ public TaskDeployState deploy(@NonNull SlotProfile slotProfile) { return deployOnRemote(slotProfile); } } catch (Throwable th) { - failedByException(th); return TaskDeployState.failed(th); } } @@ -313,24 +321,9 @@ private TaskDeployState deployInternal( Function taskGroupConsumer) { TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation(); - synchronized (this) { - if (ExecutionState.DEPLOYING.equals(currExecutionState)) { - TaskDeployState state = taskGroupConsumer.apply(taskGroupImmutableInformation); - updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); - return state; - } - return TaskDeployState.success(); - } - } - - private void failedByException(Throwable th) { - LOGGER.severe( - String.format( - "%s deploy error with Exception: %s", - this.taskFullName, ExceptionUtils.getMessage(th))); - turnToEndState(ExecutionState.FAILED); - taskFuture.complete( - new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, th)); + TaskDeployState state = taskGroupConsumer.apply(taskGroupImmutableInformation); + updateTaskState(ExecutionState.RUNNING); + return state; } private TaskGroupImmutableInformation getTaskGroupImmutableInformation() { @@ -340,141 +333,57 @@ private TaskGroupImmutableInformation getTaskGroupImmutableInformation() { this.pluginJarsUrls); } - private boolean turnToEndState(@NonNull ExecutionState endState) { - synchronized (this) { - if (!endState.isEndState()) { - String message = - String.format( - "Turn task %s state to end state need gave a end state, not %s", - taskFullName, endState); - LOGGER.warning(message); - return false; - } - // consistency check - if (currExecutionState.equals(endState)) { - return true; - } - if (currExecutionState.isEndState()) { - String message = - String.format( - "Task %s is already in terminal state %s", - taskFullName, currExecutionState); - LOGGER.warning(message); - return false; - } - - try { - RetryUtils.retryWithException( - () -> { - updateStateTimestamps(endState); - runningJobStateIMap.set(taskGroupLocation, endState); - return null; - }, - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> ExceptionUtil.isOperationNeedRetryException(exception), - Constant.OPERATION_RETRY_SLEEP)); - } catch (Exception e) { - LOGGER.warning(ExceptionUtils.getMessage(e)); - // If master/worker node done, The job will restore and fix the state from - // TaskExecutionService - LOGGER.warning( - String.format( - "Set %s state %s to Imap failed, skip.", - getTaskFullName(), endState)); - } - this.currExecutionState = endState; - LOGGER.info(String.format("%s turn to end state %s.", taskFullName, endState)); - return true; - } - } - - public boolean updateTaskState( - @NonNull ExecutionState current, @NonNull ExecutionState targetState) { - synchronized (this) { - LOGGER.info( + public synchronized void updateTaskState(@NonNull ExecutionState targetState) { + try { + ExecutionState current = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); + log.debug( String.format( "Try to update the task %s state from %s to %s", taskFullName, current, targetState)); - // consistency check - if (current.isEndState()) { - String message = "Task is trying to leave terminal state " + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - if (ExecutionState.SCHEDULED.equals(targetState) - && !ExecutionState.CREATED.equals(current)) { - String message = "Only [CREATED] task can turn to [SCHEDULED]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - if (ExecutionState.DEPLOYING.equals(targetState) - && !ExecutionState.SCHEDULED.equals(current)) { - String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); + if (current.equals(targetState)) { + log.info( + "{} current state equals target state: {}, skip", + taskFullName, + targetState); + return; } - if (ExecutionState.RUNNING.equals(targetState) - && !ExecutionState.DEPLOYING.equals(current)) { - String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); + // consistency check + if (current.isEndState()) { + String message = "Task is trying to leave terminal state " + current; + log.error(message); + return; } // now do the actual state transition - if (current.equals(currExecutionState)) { - try { - RetryUtils.retryWithException( - () -> { - updateStateTimestamps(targetState); - runningJobStateIMap.set(taskGroupLocation, targetState); - return null; - }, - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> - ExceptionUtil.isOperationNeedRetryException(exception), - Constant.OPERATION_RETRY_SLEEP)); - } catch (Exception e) { - LOGGER.warning(ExceptionUtils.getMessage(e)); - // If master/worker node done, The job will restore and fix the state from - // TaskExecutionService - LOGGER.warning( - String.format( - "Set %s state %s to Imap failed, skip.", - getTaskFullName(), targetState)); - } - this.currExecutionState = targetState; - LOGGER.info( - String.format( - "%s turn from state %s to %s.", - taskFullName, current, targetState)); - return true; - } else { - LOGGER.warning( - String.format( - "The task %s state in Imap is %s, not equals expected state %s", - taskFullName, currExecutionState, current)); - return false; + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(targetState); + runningJobStateIMap.set(taskGroupLocation, targetState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + this.currExecutionState = targetState; + log.info( + String.format( + "%s turned from state %s to %s.", taskFullName, current, targetState)); + stateProcess(); + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + if (!targetState.equals(ExecutionState.FAILING)) { + makeTaskGroupFailing(e); } } } - public void cancel() { - if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) - || updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED) - || updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELED)) { - taskFuture.complete( - new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED)); - } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) { - noticeTaskExecutionServiceCancel(); - } else if (ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) { - noticeTaskExecutionServiceCancel(); + public synchronized void cancel() { + if (!getExecutionState().isEndState()) { + updateTaskState(ExecutionState.CANCELING); } } @@ -482,9 +391,7 @@ private void noticeTaskExecutionServiceCancel() { // Check whether the node exists, and whether the Task on the node exists. If there is no // direct update state if (!checkTaskGroupIsExecuting(taskGroupLocation)) { - updateTaskState(ExecutionState.CANCELING, ExecutionState.CANCELED); - taskFuture.complete( - new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED)); + updateTaskState(ExecutionState.CANCELED); return; } int i = 0; @@ -495,11 +402,10 @@ private void noticeTaskExecutionServiceCancel() { && nodeEngine .getClusterService() .getMember(executionAddress = getCurrentExecutionAddress()) - != null - && i < Constant.OPERATION_RETRY_TIME) { + != null) { try { i++; - LOGGER.info( + log.info( String.format( "Send cancel %s operator to member %s", taskFullName, executionAddress)); @@ -513,7 +419,7 @@ private void noticeTaskExecutionServiceCancel() { .get(); return; } catch (Exception e) { - LOGGER.warning( + log.warn( String.format( "%s cancel failed with Exception: %s, retry %s", this.getTaskFullName(), ExceptionUtils.getMessage(e), i)); @@ -546,7 +452,7 @@ private void resetExecutionState() { String.format( "%s reset state failed, only end state can be reset, current is %s", getTaskFullName(), executionState); - LOGGER.severe(message); + log.error(message); throw new IllegalStateException(message); } try { @@ -562,17 +468,16 @@ private void resetExecutionState() { exception -> ExceptionUtil.isOperationNeedRetryException(exception), Constant.OPERATION_RETRY_SLEEP)); } catch (Exception e) { - LOGGER.warning(ExceptionUtils.getMessage(e)); + log.warn(ExceptionUtils.getMessage(e)); // If master/worker node done, The job will restore and fix the state from // TaskExecutionService - LOGGER.warning( + log.warn( String.format( "Set %s state %s to Imap failed, skip.", getTaskFullName(), ExecutionState.CREATED)); } this.currExecutionState = ExecutionState.CREATED; - LOGGER.info( - String.format("%s turn to state %s.", taskFullName, ExecutionState.CREATED)); + log.info(String.format("%s turn to state %s.", taskFullName, ExecutionState.CREATED)); } } @@ -584,28 +489,23 @@ public String getTaskFullName() { return taskFullName; } - public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { - if (!turnToEndState(taskExecutionState.getExecutionState())) { - return; - } - if (StringUtils.isNotEmpty(taskExecutionState.getThrowableMsg())) { - LOGGER.severe( - String.format( - "%s end with state %s and Exception: %s", - this.taskFullName, - taskExecutionState.getExecutionState(), - taskExecutionState.getThrowableMsg())); - } else { - LOGGER.info( + public void updateStateByExecutionService(TaskExecutionState taskExecutionState) { + if (!taskExecutionState.getExecutionState().isEndState()) { + throw new SeaTunnelEngineException( String.format( - "%s end with state %s", - this.taskFullName, taskExecutionState.getExecutionState())); + "The state must be end state from ExecutionService, can not be %s", + taskExecutionState.getExecutionState())); } - taskFuture.complete(taskExecutionState); + errorByPhysicalVertex.compareAndSet(null, taskExecutionState.getThrowableMsg()); + updateTaskState(taskExecutionState.getExecutionState()); } public Address getCurrentExecutionAddress() { - return jobMaster.getOwnedSlotProfiles(taskGroupLocation).getWorker(); + SlotProfile ownedSlotProfiles = jobMaster.getOwnedSlotProfiles(taskGroupLocation); + if (ownedSlotProfiles == null) { + return null; + } + return ownedSlotProfiles.getWorker(); } public TaskGroupLocation getTaskGroupLocation() { @@ -615,4 +515,81 @@ public TaskGroupLocation getTaskGroupLocation() { public void setJobMaster(JobMaster jobMaster) { this.jobMaster = jobMaster; } + + public void startPhysicalVertex() { + isRunning = true; + log.info(String.format("%s state process is start", taskFullName)); + } + + public void stopPhysicalVertex() { + isRunning = false; + log.info(String.format("%s state process is stopped", taskFullName)); + } + + public synchronized void stateProcess() { + if (!isRunning) { + log.warn(String.format("%s state process is not start", taskFullName)); + return; + } + switch (getExecutionState()) { + case INITIALIZING: + case CREATED: + case RUNNING: + break; + case DEPLOYING: + TaskDeployState deployState = + deploy(jobMaster.getOwnedSlotProfiles(taskGroupLocation)); + if (!deployState.isSuccess()) { + makeTaskGroupFailing( + new TaskGroupDeployException(deployState.getThrowableMsg())); + } else { + updateTaskState(ExecutionState.RUNNING); + } + break; + case FAILING: + updateTaskState(ExecutionState.FAILED); + break; + case CANCELING: + noticeTaskExecutionServiceCancel(); + break; + case CANCELED: + stopPhysicalVertex(); + taskFuture.complete( + new TaskExecutionState( + taskGroupLocation, + ExecutionState.CANCELED, + errorByPhysicalVertex.get())); + return; + case FAILED: + stopPhysicalVertex(); + log.error( + String.format( + "%s end with state %s and Exception: %s", + this.taskFullName, + ExecutionState.FAILED, + errorByPhysicalVertex.get())); + taskFuture.complete( + new TaskExecutionState( + taskGroupLocation, + ExecutionState.FAILED, + errorByPhysicalVertex.get())); + return; + case FINISHED: + stopPhysicalVertex(); + taskFuture.complete( + new TaskExecutionState( + taskGroupLocation, + ExecutionState.FINISHED, + errorByPhysicalVertex.get())); + return; + default: + throw new IllegalArgumentException( + "Unknown TaskGroup State: " + getExecutionState()); + } + } + + public void makeTaskGroupFailing(Throwable err) { + errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(err)); + updateTaskState(ExecutionState.FAILING); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java new file mode 100644 index 00000000000..1bacda03267 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.dag.physical; + +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; + +import lombok.NonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class ResourceUtils { + + public static Map applyResourceForPipeline( + @NonNull ResourceManager resourceManager, @NonNull SubPlan subPlan) { + Map> futures = new HashMap<>(); + Map slotProfiles = new HashMap<>(); + // TODO If there is no enough resources for tasks, we need add some wait profile + subPlan.getCoordinatorVertexList() + .forEach( + coordinator -> + futures.put( + coordinator.getTaskGroupLocation(), + applyResourceForTask(resourceManager, coordinator))); + + subPlan.getPhysicalVertexList() + .forEach( + task -> + futures.put( + task.getTaskGroupLocation(), + applyResourceForTask(resourceManager, task))); + + for (Map.Entry> future : + futures.entrySet()) { + slotProfiles.put( + future.getKey(), future.getValue() == null ? null : future.getValue().join()); + } + return slotProfiles; + } + + public static CompletableFuture applyResourceForTask( + ResourceManager resourceManager, PhysicalVertex task) { + // TODO custom resource size + return resourceManager.applyResource( + task.getTaskGroupLocation().getJobId(), new ResourceProfile()); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index 83dd4e9d0f2..0f9141ed00b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -29,28 +29,26 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorStatus; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.apache.seatunnel.engine.server.master.JobMaster; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; -import com.hazelcast.core.HazelcastInstanceNotActiveException; -import com.hazelcast.core.OperationTimeoutException; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import com.hazelcast.map.IMap; import lombok.Data; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; @Data +@Slf4j public class SubPlan { - private static final ILogger LOGGER = Logger.getLogger(SubPlan.class); /** The max num pipeline can restore. */ public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set by config @@ -102,6 +100,10 @@ public class SubPlan { private volatile PipelineStatus currPipelineStatus = PipelineStatus.INITIALIZING; + public volatile boolean isRunning = false; + + private Map slotProfiles; + public SubPlan( int pipelineId, int totalPipelineNum, @@ -154,28 +156,33 @@ public synchronized PassiveCompletableFuture initStateFu errorByPhysicalVertex = new AtomicReference<>(); physicalVertexList.forEach( physicalVertex -> { - addPhysicalVertexCallBack(physicalVertex.initStateFuture()); + addPhysicalVertexCallBack(physicalVertex.initStateFuture(), physicalVertex); }); coordinatorVertexList.forEach( coordinator -> { - addPhysicalVertexCallBack(coordinator.initStateFuture()); + addPhysicalVertexCallBack(coordinator.initStateFuture(), coordinator); }); this.pipelineFuture = new CompletableFuture<>(); return new PassiveCompletableFuture<>(pipelineFuture); } - private void addPhysicalVertexCallBack(PassiveCompletableFuture future) { + private void addPhysicalVertexCallBack( + PassiveCompletableFuture future, PhysicalVertex task) { future.thenAcceptAsync( executionState -> { try { + log.info( + "{} future complete with state {}", + task.getTaskFullName(), + executionState.getExecutionState()); // We need not handle t, Because we will not return t from PhysicalVertex if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) { canceledTaskNum.incrementAndGet(); } else if (ExecutionState.FAILED.equals( executionState.getExecutionState())) { - LOGGER.severe( + log.error( String.format( "Task %s Failed in %s, Begin to cancel other tasks in this pipeline.", executionState.getTaskGroupLocation(), @@ -183,40 +190,20 @@ private void addPhysicalVertexCallBack(PassiveCompletableFuture { - jobMaster.savePipelineMetricsToHistory(getPipelineLocation()); - jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); - jobMaster.releasePipelineResource(this); - notifyCheckpointManagerPipelineEnd(pipelineStatus); - return null; - }, - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> ExceptionUtil.isOperationNeedRetryException(exception), - Constant.OPERATION_RETRY_SLEEP)); + private void subPlanDone(PipelineStatus pipelineStatus) { + try { + RetryUtils.retryWithException( + () -> { + jobMaster.savePipelineMetricsToHistory(getPipelineLocation()); + jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); + notifyCheckpointManagerPipelineEnd(pipelineStatus); + jobMaster.releasePipelineResource(this); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + log.warn( + "The cleaning operation before pipeline {} completion is not completed, with exception: {} ", + pipelineFullName, + ExceptionUtils.getMessage(e)); + } } public boolean canRestorePipeline() { return jobMaster.isNeedRestore() && getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM; } - private void turnToEndState(@NonNull PipelineStatus endState) throws Exception { - synchronized (this) { - // consistency check - if (this.currPipelineStatus.isEndState() && !endState.isEndState()) { - String message = - "Pipeline is trying to leave terminal state " + this.currPipelineStatus; - LOGGER.severe(message); - throw new IllegalStateException(message); + public synchronized void updatePipelineState(@NonNull PipelineStatus targetState) { + try { + PipelineStatus current = (PipelineStatus) runningJobStateIMap.get(pipelineLocation); + log.debug( + String.format( + "Try to update the %s state from %s to %s", + pipelineFullName, current, targetState)); + + if (current.equals(targetState)) { + log.info( + "{} current state equals target state: {}, skip", + pipelineFullName, + targetState); + return; } - if (!endState.isEndState()) { - String message = "Need a end state, not " + endState; - LOGGER.severe(message); - throw new IllegalStateException(message); + // consistency check + if (current.isEndState()) { + String message = "Pipeline is trying to leave terminal state " + current; + log.info(message); + return; } + // now do the actual state transition // we must update runningJobStateTimestampsIMap first and then can update // runningJobStateIMap RetryUtils.retryWithException( () -> { - updateStateTimestamps(endState); - runningJobStateIMap.set(pipelineLocation, endState); + updateStateTimestamps(targetState); + runningJobStateIMap.set(pipelineLocation, targetState); return null; }, new RetryUtils.RetryMaterial( @@ -331,106 +333,24 @@ private void turnToEndState(@NonNull PipelineStatus endState) throws Exception { true, exception -> ExceptionUtil.isOperationNeedRetryException(exception), Constant.OPERATION_RETRY_SLEEP)); - this.currPipelineStatus = endState; - LOGGER.info( + this.currPipelineStatus = targetState; + log.info( String.format( - "%s turn to end state %s.", pipelineFullName, currPipelineStatus)); - } - } - - public boolean updatePipelineState( - @NonNull PipelineStatus current, @NonNull PipelineStatus targetState) throws Exception { - synchronized (this) { - // consistency check - if (current.isEndState()) { - String message = "Pipeline is trying to leave terminal state " + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - if (PipelineStatus.SCHEDULED.equals(targetState) - && !PipelineStatus.CREATED.equals(current)) { - String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - if (PipelineStatus.DEPLOYING.equals(targetState) - && !PipelineStatus.SCHEDULED.equals(current)) { - String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - if (PipelineStatus.RUNNING.equals(targetState) - && !PipelineStatus.DEPLOYING.equals(current)) { - String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current; - LOGGER.severe(message); - throw new IllegalStateException(message); - } - - // now do the actual state transition - if (current.equals(runningJobStateIMap.get(pipelineLocation))) { - LOGGER.info( - String.format( - "%s turn from state %s to %s.", - pipelineFullName, current, targetState)); - - // we must update runningJobStateTimestampsIMap first and then can update - // runningJobStateIMap - RetryUtils.retryWithException( - () -> { - updateStateTimestamps(targetState); - runningJobStateIMap.set(pipelineLocation, targetState); - return null; - }, - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> ExceptionUtil.isOperationNeedRetryException(exception), - Constant.OPERATION_RETRY_SLEEP)); - this.currPipelineStatus = targetState; - return true; - } else { - return false; + "%s turned from state %s to %s.", + pipelineFullName, current, targetState)); + stateProcess(); + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + if (!targetState.equals(PipelineStatus.FAILING)) { + makePipelineFailing(e); } } } public synchronized void cancelPipeline() { - for (int i = 0; i < 10; i++) { - try { - LOGGER.warning("start cancel job " + pipelineFullName + " count = " + i); - if (getPipelineState().isEndState()) { - LOGGER.warning( - String.format( - "%s is in end state %s, can not be cancel", - pipelineFullName, getPipelineState())); - } - // If an active Master Node done and another Master Node active, we can - // not know whether canceled pipeline complete. - // So we need cancel running pipeline again. - if (!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) { - updatePipelineState(getPipelineState(), PipelineStatus.CANCELING); - } - cancelCheckpointCoordinator(); - Optional optionalException = cancelPipelineTasks(); - if (optionalException.isPresent()) { - throw optionalException.get(); - } - break; - } catch (OperationTimeoutException - | HazelcastInstanceNotActiveException - | InterruptedException e) { - try { - Thread.sleep(2000); - } catch (Exception ignored) { - } - LOGGER.warning(String.format("%s cancel error will retry", pipelineFullName), e); - } catch (Throwable e) { - LOGGER.warning(String.format("%s cancel error", pipelineFullName), e); - break; - } + cancelCheckpointCoordinator(); + if (!getPipelineState().isEndState()) { + updatePipelineState(PipelineStatus.CANCELING); } } @@ -440,48 +360,6 @@ private void cancelCheckpointCoordinator() { } } - private Optional cancelPipelineTasks() { - List> coordinatorCancelList = - coordinatorVertexList.stream() - .map(this::cancelTask) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - List> taskCancelList = - physicalVertexList.stream() - .map(this::cancelTask) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - try { - coordinatorCancelList.addAll(taskCancelList); - CompletableFuture voidCompletableFuture = - CompletableFuture.allOf( - coordinatorCancelList.toArray(new CompletableFuture[0])); - voidCompletableFuture.get(); - return Optional.empty(); - } catch (Exception e) { - LOGGER.severe( - String.format( - "%s cancel error with exception: %s", - pipelineFullName, ExceptionUtils.getMessage(e))); - return Optional.of(e); - } - } - - private CompletableFuture cancelTask(@NonNull PhysicalVertex task) { - if (!task.getExecutionState().isEndState() - && !ExecutionState.CANCELING.equals(task.getExecutionState())) { - return CompletableFuture.supplyAsync( - () -> { - task.cancel(); - return null; - }, - executorService); - } - return null; - } - /** Before restore a pipeline, the pipeline must do reset */ private synchronized void reset() throws Exception { resetPipelineState(); @@ -511,17 +389,17 @@ private void resetPipelineState() throws Exception { String.format( "%s reset state failed, only end state can be reset, current is %s", getPipelineFullName(), pipelineState); - LOGGER.severe(message); + log.error(message); throw new IllegalStateException(message); } - LOGGER.info( + log.info( String.format( "Reset pipeline %s state to %s", getPipelineFullName(), PipelineStatus.CREATED)); updateStateTimestamps(PipelineStatus.CREATED); runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED); this.currPipelineStatus = PipelineStatus.CREATED; - LOGGER.info( + log.info( String.format( "Reset pipeline %s state to %s complete", getPipelineFullName(), PipelineStatus.CREATED)); @@ -543,19 +421,12 @@ private boolean prepareRestorePipeline() { synchronized (restoreLock) { try { pipelineRestoreNum++; - LOGGER.info( + log.info( String.format( "Restore time %s, pipeline %s", pipelineRestoreNum + "", pipelineFullName)); - // We must ensure the scheduler complete and then can handle pipeline state change. - if (jobMaster.getScheduleFuture() != null) { - jobMaster.getScheduleFuture().join(); - } - - if (reSchedulerPipelineFuture != null) { - reSchedulerPipelineFuture.join(); - } reset(); + jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false); jobMaster.getPhysicalPlan().addPipelineEndCallback(this); return true; } catch (Throwable e) { @@ -571,23 +442,17 @@ private boolean prepareRestorePipeline() { /** restore the pipeline when pipeline failed or canceled by error. */ public void restorePipeline() { - synchronized (restoreLock) { - try { - if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) { - forcePipelineFinish(); - } - jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false); - reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this); - if (reSchedulerPipelineFuture != null) { - reSchedulerPipelineFuture.join(); - } - } catch (Throwable e) { - LOGGER.severe( - String.format( - "Restore pipeline %s error with exception: ", pipelineFullName), - e); - cancelPipeline(); + try { + if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) { + forcePipelineFinish(); } + startSubPlanStateProcess(); + } catch (Throwable e) { + log.error( + String.format("Restore pipeline %s error with exception: ", pipelineFullName), + e); + makePipelineFailing(e); + startSubPlanStateProcess(); } } @@ -595,31 +460,60 @@ public void restorePipeline() { private void forcePipelineFinish() { coordinatorVertexList.forEach( coordinator -> - coordinator.updateTaskExecutionState( + coordinator.updateStateByExecutionService( new TaskExecutionState( coordinator.getTaskGroupLocation(), ExecutionState.FINISHED))); physicalVertexList.forEach( task -> - task.updateTaskExecutionState( + task.updateStateByExecutionService( new TaskExecutionState( task.getTaskGroupLocation(), ExecutionState.FINISHED))); } /** restore the pipeline state after new Master Node active */ public synchronized void restorePipelineState() { - // if PipelineStatus is less than RUNNING or equals CANCELING, may some task is in state - // CREATED, we can not schedule this tasks because have no PipelineBaseScheduler instance. - // So, we need cancel the pipeline and restore it. + // if PipelineStatus is less than RUNNING, we need cancel it and reschedule. + getPhysicalVertexList() + .forEach( + task -> { + task.restoreExecutionState(); + }); + + getCoordinatorVertexList() + .forEach( + task -> { + task.restoreExecutionState(); + }); + if (getPipelineState().ordinal() < PipelineStatus.RUNNING.ordinal()) { - cancelPipelineTasks(); - } else if (PipelineStatus.CANCELING.equals(getPipelineState())) { - cancelPipelineTasks(); + updatePipelineState(PipelineStatus.CANCELING); } else if (PipelineStatus.RUNNING.equals(getPipelineState())) { + AtomicBoolean allTaskRunning = new AtomicBoolean(true); + getCoordinatorVertexList() + .forEach( + task -> { + if (!task.getExecutionState().equals(ExecutionState.RUNNING)) { + allTaskRunning.set(false); + return; + } + }); + + getPhysicalVertexList() + .forEach( + task -> { + if (!task.getExecutionState().equals(ExecutionState.RUNNING)) { + allTaskRunning.set(false); + return; + } + }); + jobMaster .getCheckpointManager() - .reportedPipelineRunning(this.getPipelineLocation().getPipelineId(), true); + .reportedPipelineRunning( + this.getPipelineLocation().getPipelineId(), allTaskRunning.get()); } + startSubPlanStateProcess(); } public List getPhysicalVertexList() { @@ -653,9 +547,107 @@ public int getPipelineRestoreNum() { } public void handleCheckpointError() { - LOGGER.warning( + log.warn( String.format( "%s checkpoint have error, cancel the pipeline", getPipelineFullName())); this.cancelPipeline(); } + + public void startSubPlanStateProcess() { + isRunning = true; + log.info("{} state process is start", getPipelineFullName()); + stateProcess(); + } + + public void stopSubPlanStateProcess() { + isRunning = false; + log.info("{} state process is stop", getPipelineFullName()); + } + + private synchronized void stateProcess() { + if (!isRunning) { + log.warn(String.format("%s state process not start", pipelineFullName)); + return; + } + PipelineStatus state = getCurrPipelineStatus(); + switch (state) { + case CREATED: + updatePipelineState(PipelineStatus.SCHEDULED); + break; + case SCHEDULED: + try { + slotProfiles = + ResourceUtils.applyResourceForPipeline( + jobMaster.getResourceManager(), this); + log.debug( + "slotProfiles: {}, PipelineLocation: {}", + slotProfiles, + this.getPipelineLocation()); + // sead slot information to JobMaster + jobMaster.setOwnedSlotProfiles(pipelineLocation, slotProfiles); + updatePipelineState(PipelineStatus.DEPLOYING); + } catch (Exception e) { + makePipelineFailing(e); + } + break; + case DEPLOYING: + coordinatorVertexList.forEach( + task -> { + if (task.getExecutionState().equals(ExecutionState.CREATED)) { + task.startPhysicalVertex(); + task.makeTaskGroupDeploy(); + } + }); + + physicalVertexList.forEach( + task -> { + if (task.getExecutionState().equals(ExecutionState.CREATED)) { + task.startPhysicalVertex(); + task.makeTaskGroupDeploy(); + } + }); + updatePipelineState(PipelineStatus.RUNNING); + break; + case RUNNING: + break; + case FAILING: + case CANCELING: + coordinatorVertexList.forEach( + task -> { + task.cancel(); + }); + + physicalVertexList.forEach( + task -> { + task.cancel(); + }); + break; + case FAILED: + case CANCELED: + if (checkNeedRestore(state) && prepareRestorePipeline()) { + jobMaster.releasePipelineResource(this); + restorePipeline(); + return; + } + subPlanDone(state); + stopSubPlanStateProcess(); + pipelineFuture.complete( + new PipelineExecutionState(pipelineId, state, errorByPhysicalVertex.get())); + return; + case FINISHED: + subPlanDone(state); + stopSubPlanStateProcess(); + pipelineFuture.complete( + new PipelineExecutionState( + pipelineId, getPipelineState(), errorByPhysicalVertex.get())); + return; + default: + throw new IllegalArgumentException("Unknown Pipeline State: " + getPipelineState()); + } + } + + public void makePipelineFailing(Throwable e) { + errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(e)); + updatePipelineState(PipelineStatus.FAILING); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java index 46dc721362e..e1268a54646 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java @@ -48,8 +48,6 @@ public enum ExecutionState implements Serializable { CREATED, - SCHEDULED, - DEPLOYING, RUNNING, @@ -66,9 +64,9 @@ public enum ExecutionState implements Serializable { CANCELED, - FAILED, + FAILING, - RECONCILING, + FAILED, /** Restoring last possible valid state of the task if it has it. */ INITIALIZING; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java index 73474ad7761..83b5ab29e77 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -36,6 +36,7 @@ import com.hazelcast.map.IMap; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import java.io.Serializable; import java.util.ArrayList; @@ -76,7 +77,7 @@ public class JobHistoryService { * finishedJobStateImap key is jobId and value is jobState(json) JobStateData Indicates the * status of the job, pipeline, and task */ - private final IMap finishedJobStateImap; + @Getter private final IMap finishedJobStateImap; private final IMap finishedJobMetricsImap; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index d8c5313c310..1aa406ffa4d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -53,8 +54,6 @@ import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; -import org.apache.seatunnel.engine.server.scheduler.JobScheduler; -import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler; import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation; import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; @@ -109,8 +108,6 @@ public class JobMaster { private JobImmutableInformation jobImmutableInformation; - private JobScheduler jobScheduler; - private LogicalDag logicalDag; private JobDAGInfo jobDAGInfo; @@ -127,8 +124,6 @@ public class JobMaster { private CompletableFuture scheduleFuture; - private volatile boolean restore = false; - // TODO add config to change value private boolean isPhysicalDAGIInfo = true; @@ -182,8 +177,7 @@ public JobMaster( this.metricsImap = metricsImap; } - public void init(long initializationTimestamp, boolean restart, boolean canRestoreAgain) - throws Exception { + public void init(long initializationTimestamp, boolean restart) throws Exception { jobImmutableInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformationData); jobCheckpointConfig = @@ -226,9 +220,6 @@ public void init(long initializationTimestamp, boolean restart, boolean canResto this.physicalPlan = planTuple.f0(); this.physicalPlan.setJobMaster(this); this.checkpointPlanMap = planTuple.f1(); - if (!canRestoreAgain) { - this.neverNeedRestore(); - } Exception initException = null; try { this.initCheckPointManager(); @@ -293,10 +284,6 @@ public void initStateFuture() { withTryCatch( LOGGER, (v, t) -> { - // We need not handle t, Because we will not return t from physicalPlan - if (JobStatus.FAILING.equals(v.getStatus())) { - physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED); - } JobMaster.this.errorMessage = v.getError(); JobResult jobResult = new JobResult(physicalPlan.getJobStatus(), v.getError()); @@ -307,18 +294,7 @@ public void initStateFuture() { public void run() { try { - if (!restore) { - jobScheduler = new PipelineBaseScheduler(physicalPlan, this); - scheduleFuture = - CompletableFuture.runAsync( - () -> jobScheduler.startScheduling(), executorService); - LOGGER.info( - String.format( - "Job %s waiting for scheduler finished", - physicalPlan.getJobFullName())); - scheduleFuture.join(); - LOGGER.info(String.format("%s scheduler finished", physicalPlan.getJobFullName())); - } + physicalPlan.startJob(); } catch (Throwable e) { LOGGER.severe( String.format( @@ -326,8 +302,6 @@ public void run() { physicalPlan.getJobImmutableInformation().getJobConfig().getName(), physicalPlan.getJobImmutableInformation().getJobId(), ExceptionUtils.getMessage(e))); - // try to cancel job - cancelJob(); } finally { jobMasterCompleteFuture.join(); } @@ -389,23 +363,40 @@ public JobDAGInfo getJobDAGInfo() { return jobDAGInfo; } - public PassiveCompletableFuture reSchedulerPipeline(SubPlan subPlan) { - if (jobScheduler == null) { - jobScheduler = new PipelineBaseScheduler(physicalPlan, this); - } - return new PassiveCompletableFuture<>(jobScheduler.reSchedulerPipeline(subPlan)); - } - public void releasePipelineResource(SubPlan subPlan) { - LOGGER.info( - String.format("release the pipeline %s resource", subPlan.getPipelineFullName())); - resourceManager - .releaseResources( - jobImmutableInformation.getJobId(), - Lists.newArrayList( - ownedSlotProfilesIMap.get(subPlan.getPipelineLocation()).values())) - .join(); - ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation()); + try { + Map taskGroupLocationSlotProfileMap = + ownedSlotProfilesIMap.get(subPlan.getPipelineLocation()); + if (taskGroupLocationSlotProfileMap == null) { + return; + } + + RetryUtils.retryWithException( + () -> { + LOGGER.info( + String.format( + "release the pipeline %s resource", + subPlan.getPipelineFullName())); + resourceManager + .releaseResources( + jobImmutableInformation.getJobId(), + Lists.newArrayList( + taskGroupLocationSlotProfileMap.values())) + .join(); + ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation()); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOGGER.warning( + String.format( + "release the pipeline %s resource failed, with exception: %s ", + subPlan.getPipelineFullName(), ExceptionUtils.getMessage(e))); + } } public void cleanJob() { @@ -438,7 +429,6 @@ public ClassLoader getClassLoader() { } public void cancelJob() { - neverNeedRestore(); physicalPlan.cancelJob(); } @@ -635,7 +625,8 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { return; } - task.updateTaskExecutionState(taskExecutionState); + task.updateStateByExecutionService( + taskExecutionState); }); pipeline.getPhysicalVertexList() @@ -648,7 +639,8 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { return; } - task.updateTaskExecutionState(taskExecutionState); + task.updateStateByExecutionService( + taskExecutionState); }); }); } @@ -665,11 +657,6 @@ public CompletableFuture savePoint() { return CompletableFuture.allOf(passiveCompletableFutures); } - public Map getOwnedSlotProfiles( - PipelineLocation pipelineLocation) { - return ownedSlotProfilesIMap.get(pipelineLocation); - } - public void setOwnedSlotProfiles( @NonNull PipelineLocation pipelineLocation, @NonNull Map pipelineOwnedSlotProfiles) { @@ -691,15 +678,15 @@ public void setOwnedSlotProfiles( } public SlotProfile getOwnedSlotProfiles(@NonNull TaskGroupLocation taskGroupLocation) { - return ownedSlotProfilesIMap - .get( + Map taskGroupLocationSlotProfileMap = + ownedSlotProfilesIMap.get( new PipelineLocation( - taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId())) - .get(taskGroupLocation); - } + taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId())); + if (taskGroupLocationSlotProfileMap == null) { + return null; + } - public CompletableFuture getScheduleFuture() { - return scheduleFuture; + return taskGroupLocationSlotProfileMap.get(taskGroupLocation); } public ExecutorService getExecutorService() { @@ -708,11 +695,7 @@ public ExecutorService getExecutorService() { public void interrupt() { isRunning = false; - jobMasterCompleteFuture.cancel(true); - } - - public void markRestore() { - restore = true; + jobMasterCompleteFuture.completeExceptionally(new InterruptedException()); } public void neverNeedRestore() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java deleted file mode 100644 index 177f17203ca..00000000000 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.engine.server.scheduler; - -import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.engine.common.exception.JobException; -import org.apache.seatunnel.engine.common.exception.SchedulerNotAllowException; -import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; -import org.apache.seatunnel.engine.core.job.JobStatus; -import org.apache.seatunnel.engine.core.job.PipelineStatus; -import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; -import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex; -import org.apache.seatunnel.engine.server.dag.physical.SubPlan; -import org.apache.seatunnel.engine.server.execution.ExecutionState; -import org.apache.seatunnel.engine.server.execution.TaskDeployState; -import org.apache.seatunnel.engine.server.execution.TaskExecutionState; -import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; -import org.apache.seatunnel.engine.server.master.JobMaster; -import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; -import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; -import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; - -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -@Slf4j -public class PipelineBaseScheduler implements JobScheduler { - private final PhysicalPlan physicalPlan; - - private final long jobId; - private final JobMaster jobMaster; - private final ResourceManager resourceManager; - - public PipelineBaseScheduler(@NonNull PhysicalPlan physicalPlan, @NonNull JobMaster jobMaster) { - this.physicalPlan = physicalPlan; - this.jobMaster = jobMaster; - this.resourceManager = jobMaster.getResourceManager(); - this.jobId = physicalPlan.getJobImmutableInformation().getJobId(); - } - - @Override - public void startScheduling() { - if (physicalPlan.updateJobState(JobStatus.CREATED, JobStatus.SCHEDULED)) { - List> collect = - physicalPlan.getPipelineList().stream() - .map(this::schedulerPipeline) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - try { - CompletableFuture voidCompletableFuture = - CompletableFuture.allOf(collect.toArray(new CompletableFuture[0])); - voidCompletableFuture.get(); - physicalPlan.updateJobState(JobStatus.SCHEDULED, JobStatus.RUNNING); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else if (!JobStatus.CANCELED.equals(physicalPlan.getJobStatus())) { - throw new JobException( - String.format( - "%s turn to a unexpected state: %s", - physicalPlan.getJobFullName(), physicalPlan.getJobStatus())); - } - } - - // This method cannot throw an exception - private CompletableFuture schedulerPipeline(SubPlan pipeline) { - try { - if (!pipeline.updatePipelineState(PipelineStatus.CREATED, PipelineStatus.SCHEDULED)) { - handlePipelineStateTurnError(pipeline, PipelineStatus.SCHEDULED); - } - - Map slotProfiles = - getOrApplyResourceForPipeline( - pipeline, - jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation())); - - log.debug( - "slotProfiles: {}, PipelineLocation: {}", - slotProfiles, - pipeline.getPipelineLocation()); - - // To ensure release pipeline resource after new master node active, we need store - // slotProfiles first and then deploy tasks. - jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(), slotProfiles); - // deploy pipeline - return CompletableFuture.runAsync( - () -> { - deployPipeline(pipeline, slotProfiles); - }, - jobMaster.getExecutorService()); - } catch (SchedulerNotAllowException e) { - log.error( - String.format( - "scheduler %s stop. Because %s", - pipeline.getPipelineFullName(), ExceptionUtils.getMessage(e))); - CompletableFuture reSchedulerFuture = new CompletableFuture<>(); - reSchedulerFuture.complete(null); - return reSchedulerFuture; - } catch (Exception e) { - log.error( - String.format( - "scheduler %s error and cancel pipeline. The error is %s", - pipeline.getPipelineFullName(), ExceptionUtils.getMessage(e))); - pipeline.cancelPipeline(); - CompletableFuture reSchedulerFuture = new CompletableFuture<>(); - reSchedulerFuture.complete(null); - return reSchedulerFuture; - } - } - - private Map getOrApplyResourceForPipeline( - @NonNull SubPlan pipeline, Map ownedSlotProfiles) { - if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty()) { - return applyResourceForPipeline(pipeline); - } - - // TODO ensure the slots still exist and is owned by this pipeline - Map currentOwnedSlotProfiles = new ConcurrentHashMap<>(); - pipeline.getCoordinatorVertexList() - .forEach( - coordinator -> - currentOwnedSlotProfiles.put( - coordinator.getTaskGroupLocation(), - getOrApplyResourceForTask(coordinator, ownedSlotProfiles))); - - pipeline.getPhysicalVertexList() - .forEach( - task -> - currentOwnedSlotProfiles.put( - task.getTaskGroupLocation(), - getOrApplyResourceForTask(task, ownedSlotProfiles))); - - return currentOwnedSlotProfiles; - } - - private SlotProfile getOrApplyResourceForTask( - @NonNull PhysicalVertex task, Map ownedSlotProfiles) { - - SlotProfile oldProfile; - if (ownedSlotProfiles == null - || ownedSlotProfiles.isEmpty() - || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) { - oldProfile = null; - } else { - oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation()); - } - if (oldProfile == null || !resourceManager.slotActiveCheck(oldProfile)) { - SlotProfile newProfile; - CompletableFuture slotProfileCompletableFuture = - applyResourceForTask(task); - if (slotProfileCompletableFuture != null) { - newProfile = slotProfileCompletableFuture.join(); - } else { - throw new SchedulerNotAllowException( - String.format( - "The task [%s] state is [%s] and the resource can not be retrieved", - task.getTaskFullName(), task.getExecutionState())); - } - - log.info( - String.format( - "use new profile: %s to replace not active profile: %s for task %s", - newProfile, oldProfile, task.getTaskFullName())); - return newProfile; - } - log.info( - String.format( - "use active old profile: %s for task %s", - oldProfile, task.getTaskFullName())); - task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED); - return oldProfile; - } - - private Map applyResourceForPipeline(@NonNull SubPlan subPlan) { - Map> futures = new HashMap<>(); - Map slotProfiles = new HashMap<>(); - // TODO If there is no enough resources for tasks, we need add some wait profile - subPlan.getCoordinatorVertexList() - .forEach( - coordinator -> - futures.put( - coordinator.getTaskGroupLocation(), - applyResourceForTask(coordinator))); - - subPlan.getPhysicalVertexList() - .forEach( - task -> - futures.put( - task.getTaskGroupLocation(), applyResourceForTask(task))); - - for (Map.Entry> future : - futures.entrySet()) { - slotProfiles.put( - future.getKey(), future.getValue() == null ? null : future.getValue().join()); - } - return slotProfiles; - } - - private CompletableFuture applyResourceForTask(PhysicalVertex task) { - try { - if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) { - // TODO custom resource size - return resourceManager.applyResource(jobId, new ResourceProfile()); - } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) - || ExecutionState.CANCELED.equals(task.getExecutionState())) { - log.info( - "{} be canceled, skip {} this task.", - task.getTaskFullName(), - ExecutionState.SCHEDULED); - return null; - } else { - makeTaskFailed( - task.getTaskGroupLocation(), - new JobException( - String.format( - "%s turn to a unexpected state: %s, stop scheduler job.", - task.getTaskFullName(), task.getExecutionState()))); - return null; - } - } catch (Throwable e) { - makeTaskFailed(task.getTaskGroupLocation(), e); - return null; - } - } - - private CompletableFuture deployTask(PhysicalVertex task, SlotProfile slotProfile) { - if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) { - // deploy is a time-consuming operation, so we do it async - return CompletableFuture.runAsync( - () -> { - try { - TaskDeployState state = task.deploy(slotProfile); - if (!state.isSuccess()) { - jobMaster.updateTaskExecutionState( - new TaskExecutionState( - task.getTaskGroupLocation(), - ExecutionState.FAILED, - state.getThrowableMsg())); - } - } catch (Exception e) { - throw new SeaTunnelEngineException(e); - } - }, - jobMaster.getExecutorService()); - } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) - || ExecutionState.CANCELED.equals(task.getExecutionState())) { - log.info( - "{} be canceled, skip {} this task.", - task.getTaskFullName(), - ExecutionState.DEPLOYING); - return null; - } else { - jobMaster.updateTaskExecutionState( - new TaskExecutionState( - task.getTaskGroupLocation(), - ExecutionState.FAILED, - new JobException( - String.format( - "%s turn to a unexpected state: %s, stop scheduler job.", - task.getTaskFullName(), task.getExecutionState())))); - return null; - } - } - - private void deployPipeline( - @NonNull SubPlan pipeline, Map slotProfiles) { - boolean changeStateSuccess = false; - try { - changeStateSuccess = - pipeline.updatePipelineState( - PipelineStatus.SCHEDULED, PipelineStatus.DEPLOYING); - } catch (Exception e) { - log.warn( - "{} turn to state {} failed, cancel pipeline", - pipeline.getPipelineFullName(), - PipelineStatus.DEPLOYING); - pipeline.cancelPipeline(); - } - if (changeStateSuccess) { - try { - List> deployCoordinatorFuture = - pipeline.getCoordinatorVertexList().stream() - .map( - coordinator -> - deployTask( - coordinator, - slotProfiles.get( - coordinator - .getTaskGroupLocation()))) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - List> deployTaskFuture = - pipeline.getPhysicalVertexList().stream() - .map( - task -> - deployTask( - task, - slotProfiles.get( - task.getTaskGroupLocation()))) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - deployCoordinatorFuture.addAll(deployTaskFuture); - CompletableFuture voidCompletableFuture = - CompletableFuture.allOf( - deployCoordinatorFuture.toArray(new CompletableFuture[0])); - voidCompletableFuture.get(); - if (!pipeline.updatePipelineState( - PipelineStatus.DEPLOYING, PipelineStatus.RUNNING)) { - log.info( - "{} turn to state {}, skip the running state.", - pipeline.getPipelineFullName(), - pipeline.getPipelineState()); - } - } catch (Exception e) { - makePipelineFailed(pipeline, e); - } - } else if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) - || PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) { - // may be canceled - log.info( - "{} turn to state {}, skip {} this pipeline.", - pipeline.getPipelineFullName(), - pipeline.getPipelineState(), - PipelineStatus.DEPLOYING); - } else { - makePipelineFailed( - pipeline, - new JobException( - String.format( - "%s turn to a unexpected state: %s, stop scheduler job", - pipeline.getPipelineFullName(), pipeline.getPipelineState()))); - } - } - - @Override - public CompletableFuture reSchedulerPipeline(@NonNull SubPlan subPlan) { - return schedulerPipeline(subPlan); - } - - private void handlePipelineStateTurnError(SubPlan pipeline, PipelineStatus targetState) { - if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) - || PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) { - // may be canceled - throw new SchedulerNotAllowException( - String.format( - "%s turn to state %s, skip %s this pipeline.", - pipeline.getPipelineFullName(), - pipeline.getPipelineState(), - targetState)); - } else { - throw new JobException( - String.format( - "%s turn to a unexpected state: %s, stop scheduler job", - pipeline.getPipelineFullName(), pipeline.getPipelineState())); - } - } - - private void makePipelineFailed(@NonNull SubPlan pipeline, Throwable e) { - pipeline.getCoordinatorVertexList() - .forEach( - coordinator -> { - makeTaskFailed(coordinator.getTaskGroupLocation(), e); - }); - - pipeline.getPhysicalVertexList() - .forEach( - task -> { - makeTaskFailed(task.getTaskGroupLocation(), e); - }); - } - - private void makeTaskFailed(@NonNull TaskGroupLocation taskGroupLocation, Throwable e) { - jobMaster.updateTaskExecutionState( - new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, e)); - } -} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index 320ce05e3e8..16be51aea7e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -31,9 +31,6 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl; import com.hazelcast.internal.serialization.Data; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -83,9 +80,7 @@ public void testMasterNodeActive() { } @Test - public void testClearCoordinatorService() - throws MalformedURLException, NoSuchMethodException, InvocationTargetException, - IllegalAccessException { + public void testClearCoordinatorService() { HazelcastInstanceImpl coordinatorServiceTest = SeaTunnelServerStarter.createHazelcastInstance( TestUtils.getClusterName( @@ -104,7 +99,7 @@ public void testClearCoordinatorService() .newId(); LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan( - "stream_fakesource_to_file.conf", "test_clear_coordinator_service", jobId); + "stream_fake_to_console.conf", "test_clear_coordinator_service", jobId); JobImmutableInformation jobImmutableInformation = new JobImmutableInformation( @@ -126,15 +121,17 @@ public void testClearCoordinatorService() Assertions.assertEquals( JobStatus.RUNNING, coordinatorService.getJobStatus(jobId))); - Class clazz = CoordinatorService.class; - Method clearCoordinatorServiceMethod = - clazz.getDeclaredMethod("clearCoordinatorService", null); - clearCoordinatorServiceMethod.setAccessible(true); - clearCoordinatorServiceMethod.invoke(coordinatorService, null); - clearCoordinatorServiceMethod.setAccessible(false); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished. - Assertions.assertTrue(JobStatus.RUNNING.equals(coordinatorService.getJobStatus(jobId))); + coordinatorService.clearCoordinatorService(); + + // because runningJobMasterMap is empty and we have no JobHistoryServer, so return + // UNKNOWABLE. + Assertions.assertTrue(JobStatus.UNKNOWABLE.equals(coordinatorService.getJobStatus(jobId))); coordinatorServiceTest.shutdown(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java index bb9c0149025..b11e538514d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.engine.server.TestUtils; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import com.hazelcast.internal.serialization.Data; @@ -40,28 +39,24 @@ public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest { public static String CONF_PATH = "stream_fake_to_console_checkpointTimeOut.conf"; - public static long JOB_ID = System.currentTimeMillis(); @Test - @Disabled("Currently unstable tests, waiting for @EricJoy2048 to refactor state handling logic") public void testJobLevelCheckpointTimeOut() { - startJob(JOB_ID, CONF_PATH); + long jobId = System.currentTimeMillis(); + startJob(System.currentTimeMillis(), CONF_PATH); await().atMost(120000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertEquals( - server.getCoordinatorService().getJobStatus(JOB_ID), + server.getCoordinatorService().getJobStatus(jobId), JobStatus.RUNNING)); await().atMost(360000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { - log.info( - "Job status: {}", - server.getCoordinatorService().getJobStatus(JOB_ID)); Assertions.assertEquals( - server.getCoordinatorService().getJobStatus(JOB_ID), + server.getCoordinatorService().getJobStatus(jobId), JobStatus.FAILED); }); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties index 2dc1b8ca501..153d4d97c68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties @@ -25,7 +25,7 @@ appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE appender.consoleStdout.target = SYSTEM_OUT appender.consoleStdout.layout.type = PatternLayout -appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter appender.consoleStdout.filter.acceptLtWarn.level = WARN appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY @@ -35,7 +35,7 @@ appender.consoleStderr.name = consoleStderrAppender appender.consoleStderr.type = CONSOLE appender.consoleStderr.target = SYSTEM_ERR appender.consoleStderr.layout.type = PatternLayout -appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter appender.consoleStderr.filter.acceptGteWarn.level = WARN appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT