From ccd6a2a11e8d8198258301c81d71e8ed306e917a Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 8 Nov 2024 09:48:24 +0800 Subject: [PATCH 01/10] [Feature][rest] support hocon style to submit job --- .../engine/e2e/ClusterSeaTunnelContainer.java | 112 ++++++++++++++++++ .../engine/server/rest/RestConstant.java | 4 + .../server/rest/service/JobInfoService.java | 15 ++- .../server/rest/servlet/BaseServlet.java | 14 +++ .../server/rest/servlet/SubmitJobServlet.java | 10 +- 5 files changed, 150 insertions(+), 5 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 804c77f340c..e97a1e59f2e 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -215,6 +215,30 @@ public void testStartWithSavePointWithoutJobId() { }); } + @Test + public void testStartWithSavePointWithoutJobIdAndHoconStyle() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + Response response = + submitJobWithHoconStyle( + "BATCH", + container, + task._1(), + task._2(), + true, + jobName, + paramJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + @Test public void testStartWithSavePointWithoutJobIdV2() { Arrays.asList(server, secondServer) @@ -239,6 +263,30 @@ public void testStartWithSavePointWithoutJobIdV2() { }); } + @Test + public void testStartWithSavePointWithoutJobIdV2AndHoconStyle() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(1); + Response response = + submitJobWithHoconStyle( + "BATCH", + container, + task._1(), + task._2(), + true, + jobName, + paramJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + @Test public void testStopJob() { AtomicInteger i = new AtomicInteger(); @@ -798,6 +846,70 @@ private void submitJobs( return JsonUtil.toJson(jobList); } + private Response submitJobWithHoconStyle( + String jobMode, + GenericContainer container, + int port, + String contextPath, + boolean isStartWithSavePoint, + String jobName, + String paramJobName) { + String requestBody = + String.format( + "env {\n" + + " job.name = \"%s\"\n" + + " job.mode = \"%s\"\n" + + "}\n\n" + + "source {\n" + + " FakeSource {\n" + + " result_table_name = \"fake\"\n" + + " schema = {\n" + + " fields {\n" + + " name = \"string\"\n" + + " age = \"int\"\n" + + " card = \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n\n" + + "transform {\n" + + "}\n\n" + + "sink {\n" + + " Console {\n" + + " source_table_name = \"fake\"\n" + + " }\n" + + "}\n", + jobName, jobMode); + String parameters = null; + if (paramJobName != null) { + parameters = "jobName=" + paramJobName; + } + if (isStartWithSavePoint) { + parameters = parameters + "&isStartWithSavePoint=true"; + } + parameters = parameters + "&configStyle=hocon"; + Response response = + given().body(requestBody) + .header("Content-Type", "text/plain; charset=utf-8") + .post( + parameters == null + ? http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + : http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + return response; + } + private Response submitJob( String jobMode, GenericContainer container, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index e8c39da7f28..e32d0894ad7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -27,6 +27,8 @@ public class RestConstant { public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint"; + public static final String ENV_CONFIG_STYLE = "configStyle"; + public static final String JOB_STATUS = "jobStatus"; public static final String CREATE_TIME = "createTime"; @@ -45,6 +47,8 @@ public class RestConstant { public static final String METRICS = "metrics"; + public static final String HOCON = "hocon"; + public static final String TABLE_SOURCE_RECEIVED_COUNT = "TableSourceReceivedCount"; public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount"; public static final String TABLE_SOURCE_RECEIVED_QPS = "TableSourceReceivedQPS"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java index 2c497510ca3..87cde32d891 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.engine.server.rest.service; import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.common.utils.JsonUtils; @@ -37,11 +38,15 @@ import com.hazelcast.spi.impl.NodeEngineImpl; import scala.Tuple2; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.seatunnel.engine.server.rest.RestConstant.ENV_CONFIG_STYLE; +import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON; + public class JobInfoService extends BaseService { public JobInfoService(NodeEngineImpl nodeEngine) { @@ -157,9 +162,13 @@ public JsonObject submitJob(Map requestParams, byte[] requestBod && requestParams.get(RestConstant.JOB_ID) == null) { throw new IllegalArgumentException("Please provide jobId when start with save point."); } - - Config config = RestUtil.buildConfig(requestHandle(requestBody), false); - + Config config; + if (HOCON.equalsIgnoreCase(requestParams.get(ENV_CONFIG_STYLE))) { + String requestBodyStr = new String(requestBody, StandardCharsets.UTF_8); + config = ConfigFactory.parseString(requestBodyStr); + } else { + config = RestUtil.buildConfig(requestHandle(requestBody), false); + } SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false); return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode()); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java index b37883f7c3a..ba9727dab74 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java @@ -116,6 +116,20 @@ protected byte[] requestBody(HttpServletRequest req) throws IOException { return requestBody.getBytes(StandardCharsets.UTF_8); } + public byte[] requestHoconBody(HttpServletRequest req) throws IOException { + StringBuilder stringBuilder = new StringBuilder(); + String line; + + try (BufferedReader reader = req.getReader()) { + while ((line = reader.readLine()) != null) { + stringBuilder.append(line).append("\n"); + } + } + + String requestBody = stringBuilder.toString(); + return requestBody.getBytes(StandardCharsets.UTF_8); + } + protected Map getParameterMap(HttpServletRequest req) { Map reqParameterMap = new HashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java index b4e61c959c5..711040a182e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java @@ -27,6 +27,9 @@ import java.io.IOException; import java.util.Map; +import static org.apache.seatunnel.engine.server.rest.RestConstant.ENV_CONFIG_STYLE; +import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON; + public class SubmitJobServlet extends BaseServlet { private final JobInfoService jobInfoService; @@ -39,7 +42,10 @@ public SubmitJobServlet(NodeEngineImpl nodeEngine) { public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { Map requestParams = getParameterMap(req); - - writeJson(resp, jobInfoService.submitJob(requestParams, requestBody(req))); + if (HOCON.equalsIgnoreCase(requestParams.get(ENV_CONFIG_STYLE))) { + writeJson(resp, jobInfoService.submitJob(requestParams, requestHoconBody(req))); + } else { + writeJson(resp, jobInfoService.submitJob(requestParams, requestBody(req))); + } } } From b258110a1a837c98c3452a582874e5018d11376f Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 8 Nov 2024 18:00:40 +0800 Subject: [PATCH 02/10] [Feature][RestAPI] Support submit job with seatunnel style hocon format config --- .../engine/e2e/ClusterSeaTunnelContainer.java | 67 ++++++++++++++++++- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index e97a1e59f2e..5fc62cb5228 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -137,6 +137,24 @@ public void testSubmitJobWithCustomJobId() { }); } + @Test + public void testSubmitHoconJobWithCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + paramJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + @Test public void testSubmitJobWithCustomJobIdV2() { AtomicInteger i = new AtomicInteger(); @@ -222,7 +240,7 @@ public void testStartWithSavePointWithoutJobIdAndHoconStyle() { container -> { Tuple3 task = tasks.get(0); Response response = - submitJobWithHoconStyle( + submitHoconJob( "BATCH", container, task._1(), @@ -270,7 +288,7 @@ public void testStartWithSavePointWithoutJobIdV2AndHoconStyle() { container -> { Tuple3 task = tasks.get(1); Response response = - submitJobWithHoconStyle( + submitHoconJob( "BATCH", container, task._1(), @@ -590,6 +608,16 @@ private Response submitJob( return submitJob(jobMode, container, port, contextPath, false, jobName, paramJobName); } + private Response submitHoconJob( + GenericContainer container, + int port, + String contextPath, + String jobMode, + String jobName, + String paramJobName) { + return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); + } + @Test public void testStopJobs() { Arrays.asList(server) @@ -846,7 +874,7 @@ private void submitJobs( return JsonUtil.toJson(jobList); } - private Response submitJobWithHoconStyle( + private Response submitHoconJob( String jobMode, GenericContainer container, int port, @@ -1039,6 +1067,20 @@ private void submitJobAndAssertResponse( i.getAndIncrement(); } + private void submitHoconJobAndAssertResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam, + boolean isCustomJobId, + String customJobId) { + Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); + String jobId = response.getBody().jsonPath().getString("jobId"); + assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); + i.getAndIncrement(); + } + private Response submitJobAndResponse( GenericContainer> container, int port, @@ -1057,6 +1099,25 @@ private Response submitJobAndResponse( return response; } + private Response submitHoconJobAndResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam) { + Response response = + i.get() == 0 + ? submitHoconJob( + container, port, contextPath, "BATCH", jobName, customParam) + : submitHoconJob(container, port, contextPath, "BATCH", jobName, null); + if (i.get() == 0) { + response.then().statusCode(200).body("jobName", equalTo(paramJobName)); + } else { + response.then().statusCode(200).body("jobName", equalTo(jobName)); + } + return response; + } + private void assertResponse( GenericContainer> container, int port, From c3c6a59113061d2a16964b1bcd30e8411d5cc3e2 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Mon, 11 Nov 2024 17:29:03 +0800 Subject: [PATCH 03/10] [Feature][rest] support hocon style to submit job --- .../java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java index 4afbfacb71d..ad83d0b17cc 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java @@ -282,7 +282,7 @@ private GenericContainer createServer(String networkAlias) new GenericContainer<>(getDockerImage()) .withNetwork(NETWORK) .withEnv("TZ", "UTC") - .withCommand(ContainerUtil.adaptPathForWin(BIN_PATH.toString())) + .withCommand("sh " + ContainerUtil.adaptPathForWin(BIN_PATH.toString())) .withNetworkAliases(networkAlias) .withExposedPorts() .withLogConsumer( From 672c488523970e25ea132ebd0a1e3290e4cf1f63 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Tue, 12 Nov 2024 21:27:26 +0800 Subject: [PATCH 04/10] [Feature][rest] Support submit job with seatunnel style hocon format config --- docs/en/seatunnel-engine/rest-api-v2.md | 35 +++++++++++++++++-- docs/zh/seatunnel-engine/rest-api-v2.md | 33 +++++++++++++++-- .../seatunnel/engine/e2e/joblog/JobLogIT.java | 2 +- .../engine/server/rest/RestConstant.java | 2 +- 4 files changed, 66 insertions(+), 6 deletions(-) diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md index 2c642dd8fbe..77e0a7fb158 100644 --- a/docs/en/seatunnel-engine/rest-api-v2.md +++ b/docs/en/seatunnel-engine/rest-api-v2.md @@ -384,11 +384,12 @@ When we can't get the job info, the response will be: #### Parameters -> | name | type | data type | description | -> |----------------------|----------|-----------|-----------------------------------| +> | name | type | data type | description | +----------------------|----------------------|----------|-----------|-----------------------------------| > | jobId | optional | string | job id | > | jobName | optional | string | job name | > | isStartWithSavePoint | optional | string | if job is started with save point | +> | format | optional | string | config style,json/hocon,default json | #### Body @@ -421,6 +422,36 @@ When we can't get the job info, the response will be: ] } ``` +```hocon +env { + job.mode = "batch" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + name = "string" + age = "int" + card = "int" + } + } + } +} + +transform { +} + +sink { + Console { + source_table_name = "fake" + } +} + +``` + #### Responses diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md index 0ec9741d401..c40cdb1448c 100644 --- a/docs/zh/seatunnel-engine/rest-api-v2.md +++ b/docs/zh/seatunnel-engine/rest-api-v2.md @@ -380,11 +380,12 @@ seatunnel: #### 参数 -> | 参数名称 | 是否必传 | 参数类型 | 参数描述 | -> |----------------------|----------|--------|-----------------------------------| +> | 参数名称 | 是否必传 | 参数类型 | 参数描述 | +> |----------------------|----------|-----------------------------------|-----------------------------------| > | jobId | optional | string | job id | > | jobName | optional | string | job name | > | isStartWithSavePoint | optional | string | if job is started with save point | +> | format | optional | string | 配置风格,json/hocon,default json | #### 请求体 @@ -417,7 +418,35 @@ seatunnel: ] } ``` +```hocon +env { + job.mode = "batch" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + name = "string" + age = "int" + card = "int" + } + } + } +} + +transform { +} +sink { + Console { + source_table_name = "fake" + } +} + +``` #### 响应 ```json diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java index 57f31299ad7..7d2b8ff0068 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java @@ -287,7 +287,7 @@ private GenericContainer createServer(String networkAlias) new GenericContainer<>(getDockerImage()) .withNetwork(NETWORK) .withEnv("TZ", "UTC") - .withCommand("sh " + ContainerUtil.adaptPathForWin(BIN_PATH.toString())) + .withCommand(ContainerUtil.adaptPathForWin(BIN_PATH.toString())) .withNetworkAliases(networkAlias) .withExposedPorts() .withLogConsumer( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index e32d0894ad7..15359b59730 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -27,7 +27,7 @@ public class RestConstant { public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint"; - public static final String ENV_CONFIG_STYLE = "configStyle"; + public static final String ENV_CONFIG_STYLE = "format"; public static final String JOB_STATUS = "jobStatus"; From 4b3a037ee031b7481d317bf72c089c53640cbb52 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Thu, 14 Nov 2024 10:51:50 +0800 Subject: [PATCH 05/10] [Feature][rest] support hocon style to submit job --- .../apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 5fc62cb5228..47cf11f95ee 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -915,7 +915,7 @@ private Response submitHoconJob( if (isStartWithSavePoint) { parameters = parameters + "&isStartWithSavePoint=true"; } - parameters = parameters + "&configStyle=hocon"; + parameters = parameters + "&format=hocon"; Response response = given().body(requestBody) .header("Content-Type", "text/plain; charset=utf-8") From 50b6a7e0870f759092f170b322b5f3c5d55adaa2 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Thu, 14 Nov 2024 11:08:25 +0800 Subject: [PATCH 06/10] [Feature][rest] support hocon style to submit job --- .../e2e/ClusterHoconSeaTunnelContainer.java | 752 ++++++++++++++++++ .../engine/e2e/ClusterSeaTunnelContainer.java | 173 ---- 2 files changed, 752 insertions(+), 173 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java new file mode 100644 index 00000000000..39dfa7993db --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.seatunnel.engine.server.rest.RestConstant; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import io.restassured.response.Response; +import scala.Tuple3; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.restassured.RestAssured.given; +import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; +import static org.hamcrest.Matchers.equalTo; + +public class ClusterHoconSeaTunnelContainer extends SeaTunnelContainer { + + private GenericContainer secondServer; + + private final Network NETWORK = Network.newNetwork(); + + private static final String jobName = "test_hocon测试"; + private static final String paramJobName = "param_test_hocon测试"; + + private static final String http = "http://"; + + private static final String colon = ":"; + + private static final String confFile = "/fakesource_to_console.conf"; + + private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL); + private static final Path config = Paths.get(SEATUNNEL_HOME, "config"); + private static final Path hadoopJar = + Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar"); + + private static final long CUSTOM_JOB_ID_1 = 862969647010611203L; + + private static final long CUSTOM_JOB_ID_2 = 862969647010611204L; + + private static List> tasks; + + @Override + @BeforeEach + public void startUp() throws Exception { + + server = createServer("server"); + secondServer = createServer("secondServer"); + + // check cluster + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + Response response = + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/cluster"); + response.then().statusCode(200); + Assertions.assertEquals( + 2, response.jsonPath().getList("members").size()); + }); + + tasks = new ArrayList<>(); + tasks.add( + new Tuple3<>( + server.getMappedPort(5801), RestConstant.CONTEXT_PATH, CUSTOM_JOB_ID_1)); + tasks.add(new Tuple3<>(server.getMappedPort(8080), "", CUSTOM_JOB_ID_2)); + } + + @Override + @AfterEach + public void tearDown() throws Exception { + super.tearDown(); + if (secondServer != null) { + secondServer.close(); + } + } + + @Test + public void testSubmitJobWithCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + paramJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testSubmitJobWithCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(1); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + paramJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testSubmitJobWithoutCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + paramJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testSubmitJobWithoutCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(1); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + paramJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testStartWithSavePointWithoutJobId() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + jobName, + paramJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testStartWithSavePointWithoutJobIdV2() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(1); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + jobName, + paramJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testStopJob() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(0); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + i.getAndIncrement(); + }); + } + + @Test + public void testStopJobV2() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(1); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + + i.getAndIncrement(); + }); + } + + private Response submitHoconJob( + GenericContainer container, + int port, + String contextPath, + String jobMode, + String jobName, + String paramJobName) { + return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); + } + + private Response submitHoconJob( + String jobMode, + GenericContainer container, + int port, + String contextPath, + boolean isStartWithSavePoint, + String jobName, + String paramJobName) { + String requestBody = + String.format( + "env {\n" + + " job.name = \"%s\"\n" + + " job.mode = \"%s\"\n" + + "}\n\n" + + "source {\n" + + " FakeSource {\n" + + " result_table_name = \"fake\"\n" + + " schema = {\n" + + " fields {\n" + + " name = \"string\"\n" + + " age = \"int\"\n" + + " card = \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n\n" + + "transform {\n" + + "}\n\n" + + "sink {\n" + + " Console {\n" + + " source_table_name = \"fake\"\n" + + " }\n" + + "}\n", + jobName, jobMode); + String parameters = null; + if (paramJobName != null) { + parameters = "jobName=" + paramJobName; + } + if (isStartWithSavePoint) { + parameters = parameters + "&isStartWithSavePoint=true"; + } + parameters = parameters + "&format=hocon"; + Response response = + given().body(requestBody) + .header("Content-Type", "text/plain; charset=utf-8") + .post( + parameters == null + ? http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + : http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + return response; + } + + private GenericContainer createServer(String networkAlias) + throws IOException, InterruptedException { + GenericContainer server = + new GenericContainer<>(getDockerImage()) + .withNetwork(NETWORK) + .withEnv("TZ", "UTC") + .withCommand(ContainerUtil.adaptPathForWin(binPath.toString())) + .withNetworkAliases(networkAlias) + .withExposedPorts() + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger( + "seatunnel-engine:" + JDK_DOCKER_IMAGE))) + .waitingFor(Wait.forListeningPort()); + copySeaTunnelStarterToContainer(server); + server.setExposedPorts(Arrays.asList(5801, 8080)); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), + config.toString()); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"), + config.toString()); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), + hadoopJar.toString()); + server.start(); + // execute extra commands + executeExtraCommands(server); + ContainerUtil.copyConnectorJarToContainer( + server, + confFile, + getConnectorModulePath(), + getConnectorNamePrefix(), + getConnectorType(), + SEATUNNEL_HOME); + + return server; + } + + private void submitHoconJobAndAssertResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam, + boolean isCustomJobId, + String customJobId) { + Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); + String jobId = response.getBody().jsonPath().getString("jobId"); + assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); + i.getAndIncrement(); + } + + private Response submitHoconJobAndResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam) { + Response response = + i.get() == 0 + ? submitHoconJob( + container, port, contextPath, "BATCH", jobName, customParam) + : submitHoconJob(container, port, contextPath, "BATCH", jobName, null); + if (i.get() == 0) { + response.then().statusCode(200).body("jobName", equalTo(paramJobName)); + } else { + response.then().statusCode(200).body("jobName", equalTo(jobName)); + } + return response; + } + + private void assertResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String jobId, + String customJobId, + boolean isCustomJobId) { + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + assertWithStatusParameterOrNot( + container, + port, + contextPath, + i, + jobId, + customJobId, + isCustomJobId, + true); + + // test for without status parameter. + assertWithStatusParameterOrNot( + container, + port, + contextPath, + i, + jobId, + customJobId, + isCustomJobId, + false); + }); + } + + private void assertWithStatusParameterOrNot( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String jobId, + String customJobId, + boolean isCustomJobId, + boolean isStatusWithSubmitJob) { + String baseRestUrl = getBaseRestUrl(container, port, contextPath); + String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" : baseRestUrl; + given().get(restUrl) + .then() + .statusCode(200) + .body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ? paramJobName : jobName)) + .body("[" + i.get() + "].errorMsg", equalTo(null)) + .body( + "[" + i.get() + "].jobId", + equalTo(i.get() == 0 && isCustomJobId ? customJobId : jobId)) + .body("[" + i.get() + "].metrics.SourceReceivedCount", equalTo("100")) + .body("[" + i.get() + "].metrics.SinkWriteCount", equalTo("100")) + .body("[" + i.get() + "].jobStatus", equalTo("FINISHED")); + } + + private String getBaseRestUrl( + GenericContainer> container, + int port, + String contextPath) { + return http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.FINISHED_JOBS_INFO; + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 47cf11f95ee..804c77f340c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -137,24 +137,6 @@ public void testSubmitJobWithCustomJobId() { }); } - @Test - public void testSubmitHoconJobWithCustomJobId() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - paramJobName + "&jobId=" + task._3(), - true, - task._3().toString()); - }); - } - @Test public void testSubmitJobWithCustomJobIdV2() { AtomicInteger i = new AtomicInteger(); @@ -233,30 +215,6 @@ public void testStartWithSavePointWithoutJobId() { }); } - @Test - public void testStartWithSavePointWithoutJobIdAndHoconStyle() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - jobName, - paramJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - @Test public void testStartWithSavePointWithoutJobIdV2() { Arrays.asList(server, secondServer) @@ -281,30 +239,6 @@ public void testStartWithSavePointWithoutJobIdV2() { }); } - @Test - public void testStartWithSavePointWithoutJobIdV2AndHoconStyle() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(1); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - jobName, - paramJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - @Test public void testStopJob() { AtomicInteger i = new AtomicInteger(); @@ -608,16 +542,6 @@ private Response submitJob( return submitJob(jobMode, container, port, contextPath, false, jobName, paramJobName); } - private Response submitHoconJob( - GenericContainer container, - int port, - String contextPath, - String jobMode, - String jobName, - String paramJobName) { - return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); - } - @Test public void testStopJobs() { Arrays.asList(server) @@ -874,70 +798,6 @@ private void submitJobs( return JsonUtil.toJson(jobList); } - private Response submitHoconJob( - String jobMode, - GenericContainer container, - int port, - String contextPath, - boolean isStartWithSavePoint, - String jobName, - String paramJobName) { - String requestBody = - String.format( - "env {\n" - + " job.name = \"%s\"\n" - + " job.mode = \"%s\"\n" - + "}\n\n" - + "source {\n" - + " FakeSource {\n" - + " result_table_name = \"fake\"\n" - + " schema = {\n" - + " fields {\n" - + " name = \"string\"\n" - + " age = \"int\"\n" - + " card = \"int\"\n" - + " }\n" - + " }\n" - + " }\n" - + "}\n\n" - + "transform {\n" - + "}\n\n" - + "sink {\n" - + " Console {\n" - + " source_table_name = \"fake\"\n" - + " }\n" - + "}\n", - jobName, jobMode); - String parameters = null; - if (paramJobName != null) { - parameters = "jobName=" + paramJobName; - } - if (isStartWithSavePoint) { - parameters = parameters + "&isStartWithSavePoint=true"; - } - parameters = parameters + "&format=hocon"; - Response response = - given().body(requestBody) - .header("Content-Type", "text/plain; charset=utf-8") - .post( - parameters == null - ? http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - : http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); - return response; - } - private Response submitJob( String jobMode, GenericContainer container, @@ -1067,20 +927,6 @@ private void submitJobAndAssertResponse( i.getAndIncrement(); } - private void submitHoconJobAndAssertResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam, - boolean isCustomJobId, - String customJobId) { - Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); - String jobId = response.getBody().jsonPath().getString("jobId"); - assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); - i.getAndIncrement(); - } - private Response submitJobAndResponse( GenericContainer> container, int port, @@ -1099,25 +945,6 @@ private Response submitJobAndResponse( return response; } - private Response submitHoconJobAndResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam) { - Response response = - i.get() == 0 - ? submitHoconJob( - container, port, contextPath, "BATCH", jobName, customParam) - : submitHoconJob(container, port, contextPath, "BATCH", jobName, null); - if (i.get() == 0) { - response.then().statusCode(200).body("jobName", equalTo(paramJobName)); - } else { - response.then().statusCode(200).body("jobName", equalTo(jobName)); - } - return response; - } - private void assertResponse( GenericContainer> container, int port, From 5986c81b60c5b1e8eaf3dee29a544d2231b7d2ad Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Thu, 14 Nov 2024 11:44:28 +0800 Subject: [PATCH 07/10] [Feature][rest] support hocon style to submit job --- .../e2e/ClusterHoconSeaTunnelContainer.java | 752 ------------------ .../engine/e2e/ClusterSeaTunnelContainer.java | 534 +++++++++++++ 2 files changed, 534 insertions(+), 752 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java deleted file mode 100644 index 39dfa7993db..00000000000 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterHoconSeaTunnelContainer.java +++ /dev/null @@ -1,752 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.engine.e2e; - -import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.apache.seatunnel.engine.server.rest.RestConstant; - -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerLoggerFactory; -import org.testcontainers.utility.MountableFile; - -import io.restassured.response.Response; -import scala.Tuple3; - -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static io.restassured.RestAssured.given; -import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; -import static org.hamcrest.Matchers.equalTo; - -public class ClusterHoconSeaTunnelContainer extends SeaTunnelContainer { - - private GenericContainer secondServer; - - private final Network NETWORK = Network.newNetwork(); - - private static final String jobName = "test_hocon测试"; - private static final String paramJobName = "param_test_hocon测试"; - - private static final String http = "http://"; - - private static final String colon = ":"; - - private static final String confFile = "/fakesource_to_console.conf"; - - private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL); - private static final Path config = Paths.get(SEATUNNEL_HOME, "config"); - private static final Path hadoopJar = - Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar"); - - private static final long CUSTOM_JOB_ID_1 = 862969647010611203L; - - private static final long CUSTOM_JOB_ID_2 = 862969647010611204L; - - private static List> tasks; - - @Override - @BeforeEach - public void startUp() throws Exception { - - server = createServer("server"); - secondServer = createServer("secondServer"); - - // check cluster - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> { - Response response = - given().get( - http - + server.getHost() - + colon - + server.getFirstMappedPort() - + "/hazelcast/rest/cluster"); - response.then().statusCode(200); - Assertions.assertEquals( - 2, response.jsonPath().getList("members").size()); - }); - - tasks = new ArrayList<>(); - tasks.add( - new Tuple3<>( - server.getMappedPort(5801), RestConstant.CONTEXT_PATH, CUSTOM_JOB_ID_1)); - tasks.add(new Tuple3<>(server.getMappedPort(8080), "", CUSTOM_JOB_ID_2)); - } - - @Override - @AfterEach - public void tearDown() throws Exception { - super.tearDown(); - if (secondServer != null) { - secondServer.close(); - } - } - - @Test - public void testSubmitJobWithCustomJobId() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - paramJobName + "&jobId=" + task._3(), - true, - task._3().toString()); - }); - } - - @Test - public void testSubmitJobWithCustomJobIdV2() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(1); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - paramJobName + "&jobId=" + task._3(), - true, - task._3().toString()); - }); - } - - @Test - public void testSubmitJobWithoutCustomJobId() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - paramJobName, - false, - task._3().toString()); - }); - } - - @Test - public void testSubmitJobWithoutCustomJobIdV2() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(1); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - paramJobName, - false, - task._3().toString()); - }); - } - - @Test - public void testStartWithSavePointWithoutJobId() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - jobName, - paramJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - - @Test - public void testStartWithSavePointWithoutJobIdV2() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(1); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - jobName, - paramJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - - @Test - public void testStopJob() { - AtomicInteger i = new AtomicInteger(); - - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(0); - String jobId = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - - String parameters = - "{" - + "\"jobId\":" - + jobId - + "," - + "\"isStopWithSavePoint\":true}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId)); - - Awaitility.await() - .atMost(6, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/SAVEPOINT_DONE") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId))); - - String jobId2 = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId2) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - parameters = - "{" - + "\"jobId\":" - + jobId2 - + "," - + "\"isStopWithSavePoint\":false}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId2)); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/CANCELED") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId2))); - i.getAndIncrement(); - }); - } - - @Test - public void testStopJobV2() { - AtomicInteger i = new AtomicInteger(); - - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(1); - String jobId = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - - String parameters = - "{" - + "\"jobId\":" - + jobId - + "," - + "\"isStopWithSavePoint\":true}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId)); - - Awaitility.await() - .atMost(6, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/SAVEPOINT_DONE") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId))); - - String jobId2 = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId2) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - parameters = - "{" - + "\"jobId\":" - + jobId2 - + "," - + "\"isStopWithSavePoint\":false}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId2)); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/CANCELED") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId2))); - - i.getAndIncrement(); - }); - } - - private Response submitHoconJob( - GenericContainer container, - int port, - String contextPath, - String jobMode, - String jobName, - String paramJobName) { - return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); - } - - private Response submitHoconJob( - String jobMode, - GenericContainer container, - int port, - String contextPath, - boolean isStartWithSavePoint, - String jobName, - String paramJobName) { - String requestBody = - String.format( - "env {\n" - + " job.name = \"%s\"\n" - + " job.mode = \"%s\"\n" - + "}\n\n" - + "source {\n" - + " FakeSource {\n" - + " result_table_name = \"fake\"\n" - + " schema = {\n" - + " fields {\n" - + " name = \"string\"\n" - + " age = \"int\"\n" - + " card = \"int\"\n" - + " }\n" - + " }\n" - + " }\n" - + "}\n\n" - + "transform {\n" - + "}\n\n" - + "sink {\n" - + " Console {\n" - + " source_table_name = \"fake\"\n" - + " }\n" - + "}\n", - jobName, jobMode); - String parameters = null; - if (paramJobName != null) { - parameters = "jobName=" + paramJobName; - } - if (isStartWithSavePoint) { - parameters = parameters + "&isStartWithSavePoint=true"; - } - parameters = parameters + "&format=hocon"; - Response response = - given().body(requestBody) - .header("Content-Type", "text/plain; charset=utf-8") - .post( - parameters == null - ? http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - : http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); - return response; - } - - private GenericContainer createServer(String networkAlias) - throws IOException, InterruptedException { - GenericContainer server = - new GenericContainer<>(getDockerImage()) - .withNetwork(NETWORK) - .withEnv("TZ", "UTC") - .withCommand(ContainerUtil.adaptPathForWin(binPath.toString())) - .withNetworkAliases(networkAlias) - .withExposedPorts() - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger( - "seatunnel-engine:" + JDK_DOCKER_IMAGE))) - .waitingFor(Wait.forListeningPort()); - copySeaTunnelStarterToContainer(server); - server.setExposedPorts(Arrays.asList(5801, 8080)); - server.withCopyFileToContainer( - MountableFile.forHostPath( - PROJECT_ROOT_PATH - + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), - config.toString()); - server.withCopyFileToContainer( - MountableFile.forHostPath( - PROJECT_ROOT_PATH - + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"), - config.toString()); - server.withCopyFileToContainer( - MountableFile.forHostPath( - PROJECT_ROOT_PATH - + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), - hadoopJar.toString()); - server.start(); - // execute extra commands - executeExtraCommands(server); - ContainerUtil.copyConnectorJarToContainer( - server, - confFile, - getConnectorModulePath(), - getConnectorNamePrefix(), - getConnectorType(), - SEATUNNEL_HOME); - - return server; - } - - private void submitHoconJobAndAssertResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam, - boolean isCustomJobId, - String customJobId) { - Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); - String jobId = response.getBody().jsonPath().getString("jobId"); - assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); - i.getAndIncrement(); - } - - private Response submitHoconJobAndResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam) { - Response response = - i.get() == 0 - ? submitHoconJob( - container, port, contextPath, "BATCH", jobName, customParam) - : submitHoconJob(container, port, contextPath, "BATCH", jobName, null); - if (i.get() == 0) { - response.then().statusCode(200).body("jobName", equalTo(paramJobName)); - } else { - response.then().statusCode(200).body("jobName", equalTo(jobName)); - } - return response; - } - - private void assertResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String jobId, - String customJobId, - boolean isCustomJobId) { - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> { - assertWithStatusParameterOrNot( - container, - port, - contextPath, - i, - jobId, - customJobId, - isCustomJobId, - true); - - // test for without status parameter. - assertWithStatusParameterOrNot( - container, - port, - contextPath, - i, - jobId, - customJobId, - isCustomJobId, - false); - }); - } - - private void assertWithStatusParameterOrNot( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String jobId, - String customJobId, - boolean isCustomJobId, - boolean isStatusWithSubmitJob) { - String baseRestUrl = getBaseRestUrl(container, port, contextPath); - String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" : baseRestUrl; - given().get(restUrl) - .then() - .statusCode(200) - .body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ? paramJobName : jobName)) - .body("[" + i.get() + "].errorMsg", equalTo(null)) - .body( - "[" + i.get() + "].jobId", - equalTo(i.get() == 0 && isCustomJobId ? customJobId : jobId)) - .body("[" + i.get() + "].metrics.SourceReceivedCount", equalTo("100")) - .body("[" + i.get() + "].metrics.SinkWriteCount", equalTo("100")) - .body("[" + i.get() + "].jobStatus", equalTo("FINISHED")); - } - - private String getBaseRestUrl( - GenericContainer> container, - int port, - String contextPath) { - return http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.FINISHED_JOBS_INFO; - } -} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 804c77f340c..8dd6cd5321b 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -61,6 +61,8 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final String jobName = "test测试"; private static final String paramJobName = "param_test测试"; + private static final String hoconJobName = "test_hocon测试"; + private static final String hoconParamJobName = "param_test_hocon测试"; private static final String http = "http://"; @@ -77,6 +79,10 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final long CUSTOM_JOB_ID_2 = 862969647010611202L; + private static final long HOCON_CUSTOM_JOB_ID_1 = 862969647010611203L; + + private static final long HOCON_CUSTOM_JOB_ID_2 = 862969647010611204L; + private static List> tasks; @Override @@ -108,6 +114,14 @@ public void startUp() throws Exception { new Tuple3<>( server.getMappedPort(5801), RestConstant.CONTEXT_PATH, CUSTOM_JOB_ID_1)); tasks.add(new Tuple3<>(server.getMappedPort(8080), "", CUSTOM_JOB_ID_2)); + + tasks.add( + new Tuple3<>( + server.getMappedPort(5801), + RestConstant.CONTEXT_PATH, + HOCON_CUSTOM_JOB_ID_1)); + + tasks.add(new Tuple3<>(server.getMappedPort(8080), "", HOCON_CUSTOM_JOB_ID_2)); } @Override @@ -706,6 +720,419 @@ public void testSubmitJobsV2() { }); } + @Test + public void testHoconSubmitJobWithCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithoutCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithoutCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testHoconStartWithSavePointWithoutJobId() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + hoconJobName, + hoconParamJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testHoconStartWithSavePointWithoutJobIdV2() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + hoconJobName, + hoconParamJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testHoconStopJob() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + i.getAndIncrement(); + }); + } + + @Test + public void testHoconStopJobV2() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + + i.getAndIncrement(); + }); + } + private void submitJobs( String jobMode, GenericContainer container, @@ -1015,4 +1442,111 @@ private String getBaseRestUrl( + contextPath + RestConstant.FINISHED_JOBS_INFO; } + + private Response submitHoconJob( + GenericContainer container, + int port, + String contextPath, + String jobMode, + String jobName, + String paramJobName) { + return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); + } + + private Response submitHoconJob( + String jobMode, + GenericContainer container, + int port, + String contextPath, + boolean isStartWithSavePoint, + String jobName, + String paramJobName) { + String requestBody = + String.format( + "env {\n" + + " job.name = \"%s\"\n" + + " job.mode = \"%s\"\n" + + "}\n\n" + + "source {\n" + + " FakeSource {\n" + + " result_table_name = \"fake\"\n" + + " schema = {\n" + + " fields {\n" + + " name = \"string\"\n" + + " age = \"int\"\n" + + " card = \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n\n" + + "transform {\n" + + "}\n\n" + + "sink {\n" + + " Console {\n" + + " source_table_name = \"fake\"\n" + + " }\n" + + "}\n", + jobName, jobMode); + String parameters = null; + if (paramJobName != null) { + parameters = "jobName=" + paramJobName; + } + if (isStartWithSavePoint) { + parameters = parameters + "&isStartWithSavePoint=true"; + } + parameters = parameters + "&format=hocon"; + Response response = + given().body(requestBody) + .header("Content-Type", "text/plain; charset=utf-8") + .post( + parameters == null + ? http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + : http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + return response; + } + + private void submitHoconJobAndAssertResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam, + boolean isCustomJobId, + String customJobId) { + Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); + String jobId = response.getBody().jsonPath().getString("jobId"); + assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); + i.getAndIncrement(); + } + + private Response submitHoconJobAndResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam) { + Response response = + i.get() == 0 + ? submitHoconJob( + container, port, contextPath, "BATCH", hoconJobName, customParam) + : submitHoconJob(container, port, contextPath, "BATCH", hoconJobName, null); + if (i.get() == 0) { + response.then().statusCode(200).body("jobName", equalTo(hoconParamJobName)); + } else { + response.then().statusCode(200).body("jobName", equalTo(hoconJobName)); + } + return response; + } } From 218d126b6c9484090f35b290ee063f50b3f02e6c Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Thu, 14 Nov 2024 14:53:00 +0800 Subject: [PATCH 08/10] [Feature][rest] support hocon style to submit job --- .../engine/e2e/ClusterSeaTunnelContainer.java | 644 ++---------------- 1 file changed, 55 insertions(+), 589 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 8dd6cd5321b..7d1e227a33b 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -61,8 +61,6 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final String jobName = "test测试"; private static final String paramJobName = "param_test测试"; - private static final String hoconJobName = "test_hocon测试"; - private static final String hoconParamJobName = "param_test_hocon测试"; private static final String http = "http://"; @@ -79,10 +77,6 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final long CUSTOM_JOB_ID_2 = 862969647010611202L; - private static final long HOCON_CUSTOM_JOB_ID_1 = 862969647010611203L; - - private static final long HOCON_CUSTOM_JOB_ID_2 = 862969647010611204L; - private static List> tasks; @Override @@ -99,11 +93,11 @@ public void startUp() throws Exception { () -> { Response response = given().get( - http - + server.getHost() - + colon - + server.getFirstMappedPort() - + "/hazelcast/rest/cluster"); + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/cluster"); response.then().statusCode(200); Assertions.assertEquals( 2, response.jsonPath().getList("members").size()); @@ -114,14 +108,6 @@ public void startUp() throws Exception { new Tuple3<>( server.getMappedPort(5801), RestConstant.CONTEXT_PATH, CUSTOM_JOB_ID_1)); tasks.add(new Tuple3<>(server.getMappedPort(8080), "", CUSTOM_JOB_ID_2)); - - tasks.add( - new Tuple3<>( - server.getMappedPort(5801), - RestConstant.CONTEXT_PATH, - HOCON_CUSTOM_JOB_ID_1)); - - tasks.add(new Tuple3<>(server.getMappedPort(8080), "", HOCON_CUSTOM_JOB_ID_2)); } @Override @@ -263,12 +249,12 @@ public void testStopJob() { Tuple3 task = tasks.get(0); String jobId = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -284,7 +270,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId) .then() @@ -321,7 +307,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/SAVEPOINT_DONE") .then() .statusCode(200) @@ -331,12 +317,12 @@ public void testStopJob() { String jobId2 = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -352,7 +338,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId2) .then() @@ -388,7 +374,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -409,12 +395,12 @@ public void testStopJobV2() { Tuple3 task = tasks.get(1); String jobId = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -430,7 +416,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId) .then() @@ -467,7 +453,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/SAVEPOINT_DONE") .then() .statusCode(200) @@ -477,12 +463,12 @@ public void testStopJobV2() { String jobId2 = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -498,7 +484,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId2) .then() @@ -534,7 +520,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -592,7 +578,7 @@ public void testStopJobs() { .body("[1].jobId", equalTo(task._3() - 1)); String[] jobIds = new String[] { - String.valueOf(task._3() - 1), String.valueOf(task._3()) + String.valueOf(task._3() - 1), String.valueOf(task._3()) }; Awaitility.await() @@ -602,12 +588,12 @@ public void testStopJobs() { given().get( http + container - .getHost() + .getHost() + colon + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -657,7 +643,7 @@ public void testStopJobsV2() { String[] jobIds = new String[] { - String.valueOf(task._3() - 1), String.valueOf(task._3()) + String.valueOf(task._3() - 1), String.valueOf(task._3()) }; Awaitility.await() .atMost(2, TimeUnit.MINUTES) @@ -666,12 +652,12 @@ public void testStopJobsV2() { given().get( http + container - .getHost() + .getHost() + colon + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -720,419 +706,6 @@ public void testSubmitJobsV2() { }); } - @Test - public void testHoconSubmitJobWithCustomJobId() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(2); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - hoconParamJobName + "&jobId=" + task._3(), - true, - task._3().toString()); - }); - } - - @Test - public void testHoconSubmitJobWithCustomJobIdV2() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(3); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - hoconParamJobName + "&jobId=" + task._3(), - true, - task._3().toString()); - }); - } - - @Test - public void testHoconSubmitJobWithoutCustomJobId() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(2); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - hoconParamJobName, - false, - task._3().toString()); - }); - } - - @Test - public void testHoconSubmitJobWithoutCustomJobIdV2() { - AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(3); - submitHoconJobAndAssertResponse( - container, - task._1(), - task._2(), - i, - hoconParamJobName, - false, - task._3().toString()); - }); - } - - @Test - public void testHoconStartWithSavePointWithoutJobId() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(2); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - hoconJobName, - hoconParamJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - - @Test - public void testHoconStartWithSavePointWithoutJobIdV2() { - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(3); - Response response = - submitHoconJob( - "BATCH", - container, - task._1(), - task._2(), - true, - hoconJobName, - hoconParamJobName); - response.then() - .statusCode(400) - .body( - "message", - equalTo( - "Please provide jobId when start with save point.")); - }); - } - - @Test - public void testHoconStopJob() { - AtomicInteger i = new AtomicInteger(); - - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(2); - String jobId = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - hoconJobName, - hoconParamJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - - String parameters = - "{" - + "\"jobId\":" - + jobId - + "," - + "\"isStopWithSavePoint\":true}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId)); - - Awaitility.await() - .atMost(6, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/SAVEPOINT_DONE") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId))); - - String jobId2 = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - hoconJobName, - hoconParamJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId2) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - parameters = - "{" - + "\"jobId\":" - + jobId2 - + "," - + "\"isStopWithSavePoint\":false}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId2)); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/CANCELED") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId2))); - i.getAndIncrement(); - }); - } - - @Test - public void testHoconStopJobV2() { - AtomicInteger i = new AtomicInteger(); - - Arrays.asList(server, secondServer) - .forEach( - container -> { - Tuple3 task = tasks.get(3); - String jobId = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - hoconJobName, - hoconParamJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - - String parameters = - "{" - + "\"jobId\":" - + jobId - + "," - + "\"isStopWithSavePoint\":true}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId)); - - Awaitility.await() - .atMost(6, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/SAVEPOINT_DONE") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId))); - - String jobId2 = - submitHoconJob( - container, - task._1(), - task._2(), - "STREAMING", - hoconJobName, - hoconParamJobName) - .getBody() - .jsonPath() - .getString("jobId"); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .RUNNING_JOB_URL - + "/" - + jobId2) - .then() - .statusCode(200) - .body("jobStatus", equalTo("RUNNING"))); - parameters = - "{" - + "\"jobId\":" - + jobId2 - + "," - + "\"isStopWithSavePoint\":false}"; - - given().body(parameters) - .post( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant.STOP_JOB_URL) - .then() - .statusCode(200) - .body("jobId", equalTo(jobId2)); - - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - given().get( - http - + container.getHost() - + colon - + task._1() - + task._2() - + RestConstant - .FINISHED_JOBS_INFO - + "/CANCELED") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobId", - equalTo(jobId2))); - - i.getAndIncrement(); - }); - } - private void submitJobs( String jobMode, GenericContainer container, @@ -1279,19 +852,19 @@ private Response submitJob( .post( parameters == null ? http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL : http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); return response; } @@ -1442,111 +1015,4 @@ private String getBaseRestUrl( + contextPath + RestConstant.FINISHED_JOBS_INFO; } - - private Response submitHoconJob( - GenericContainer container, - int port, - String contextPath, - String jobMode, - String jobName, - String paramJobName) { - return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); - } - - private Response submitHoconJob( - String jobMode, - GenericContainer container, - int port, - String contextPath, - boolean isStartWithSavePoint, - String jobName, - String paramJobName) { - String requestBody = - String.format( - "env {\n" - + " job.name = \"%s\"\n" - + " job.mode = \"%s\"\n" - + "}\n\n" - + "source {\n" - + " FakeSource {\n" - + " result_table_name = \"fake\"\n" - + " schema = {\n" - + " fields {\n" - + " name = \"string\"\n" - + " age = \"int\"\n" - + " card = \"int\"\n" - + " }\n" - + " }\n" - + " }\n" - + "}\n\n" - + "transform {\n" - + "}\n\n" - + "sink {\n" - + " Console {\n" - + " source_table_name = \"fake\"\n" - + " }\n" - + "}\n", - jobName, jobMode); - String parameters = null; - if (paramJobName != null) { - parameters = "jobName=" + paramJobName; - } - if (isStartWithSavePoint) { - parameters = parameters + "&isStartWithSavePoint=true"; - } - parameters = parameters + "&format=hocon"; - Response response = - given().body(requestBody) - .header("Content-Type", "text/plain; charset=utf-8") - .post( - parameters == null - ? http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - : http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); - return response; - } - - private void submitHoconJobAndAssertResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam, - boolean isCustomJobId, - String customJobId) { - Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); - String jobId = response.getBody().jsonPath().getString("jobId"); - assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); - i.getAndIncrement(); - } - - private Response submitHoconJobAndResponse( - GenericContainer> container, - int port, - String contextPath, - AtomicInteger i, - String customParam) { - Response response = - i.get() == 0 - ? submitHoconJob( - container, port, contextPath, "BATCH", hoconJobName, customParam) - : submitHoconJob(container, port, contextPath, "BATCH", hoconJobName, null); - if (i.get() == 0) { - response.then().statusCode(200).body("jobName", equalTo(hoconParamJobName)); - } else { - response.then().statusCode(200).body("jobName", equalTo(hoconJobName)); - } - return response; - } } From c39758243d35dee9eab96cecae3ac23e8d348dae Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Thu, 14 Nov 2024 15:12:46 +0800 Subject: [PATCH 09/10] [Feature][rest] support hocon style to submit job --- .../engine/e2e/ClusterSeaTunnelContainer.java | 110 +++++++++--------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 7d1e227a33b..804c77f340c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -93,11 +93,11 @@ public void startUp() throws Exception { () -> { Response response = given().get( - http - + server.getHost() - + colon - + server.getFirstMappedPort() - + "/hazelcast/rest/cluster"); + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/cluster"); response.then().statusCode(200); Assertions.assertEquals( 2, response.jsonPath().getList("members").size()); @@ -249,12 +249,12 @@ public void testStopJob() { Tuple3 task = tasks.get(0); String jobId = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -270,7 +270,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId) .then() @@ -307,7 +307,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/SAVEPOINT_DONE") .then() .statusCode(200) @@ -317,12 +317,12 @@ public void testStopJob() { String jobId2 = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -338,7 +338,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId2) .then() @@ -374,7 +374,7 @@ public void testStopJob() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -395,12 +395,12 @@ public void testStopJobV2() { Tuple3 task = tasks.get(1); String jobId = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -416,7 +416,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId) .then() @@ -453,7 +453,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/SAVEPOINT_DONE") .then() .statusCode(200) @@ -463,12 +463,12 @@ public void testStopJobV2() { String jobId2 = submitJob( - container, - task._1(), - task._2(), - "STREAMING", - jobName, - paramJobName) + container, + task._1(), + task._2(), + "STREAMING", + jobName, + paramJobName) .getBody() .jsonPath() .getString("jobId"); @@ -484,7 +484,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .RUNNING_JOB_URL + .RUNNING_JOB_URL + "/" + jobId2) .then() @@ -520,7 +520,7 @@ public void testStopJobV2() { + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -578,7 +578,7 @@ public void testStopJobs() { .body("[1].jobId", equalTo(task._3() - 1)); String[] jobIds = new String[] { - String.valueOf(task._3() - 1), String.valueOf(task._3()) + String.valueOf(task._3() - 1), String.valueOf(task._3()) }; Awaitility.await() @@ -588,12 +588,12 @@ public void testStopJobs() { given().get( http + container - .getHost() + .getHost() + colon + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -643,7 +643,7 @@ public void testStopJobsV2() { String[] jobIds = new String[] { - String.valueOf(task._3() - 1), String.valueOf(task._3()) + String.valueOf(task._3() - 1), String.valueOf(task._3()) }; Awaitility.await() .atMost(2, TimeUnit.MINUTES) @@ -652,12 +652,12 @@ public void testStopJobsV2() { given().get( http + container - .getHost() + .getHost() + colon + task._1() + task._2() + RestConstant - .FINISHED_JOBS_INFO + .FINISHED_JOBS_INFO + "/CANCELED") .then() .statusCode(200) @@ -852,19 +852,19 @@ private Response submitJob( .post( parameters == null ? http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL : http - + container.getHost() - + colon - + port - + contextPath - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); return response; } From c4a6c6c3ad23a5e56b8e64c86785816e85887f11 Mon Sep 17 00:00:00 2001 From: njh_cmss Date: Fri, 15 Nov 2024 10:32:59 +0800 Subject: [PATCH 10/10] [Feature][rest] support hocon style to submit job --- .../engine/e2e/ClusterSeaTunnelContainer.java | 534 ++++++++++++++++++ 1 file changed, 534 insertions(+) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index 804c77f340c..8dd6cd5321b 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -61,6 +61,8 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final String jobName = "test测试"; private static final String paramJobName = "param_test测试"; + private static final String hoconJobName = "test_hocon测试"; + private static final String hoconParamJobName = "param_test_hocon测试"; private static final String http = "http://"; @@ -77,6 +79,10 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final long CUSTOM_JOB_ID_2 = 862969647010611202L; + private static final long HOCON_CUSTOM_JOB_ID_1 = 862969647010611203L; + + private static final long HOCON_CUSTOM_JOB_ID_2 = 862969647010611204L; + private static List> tasks; @Override @@ -108,6 +114,14 @@ public void startUp() throws Exception { new Tuple3<>( server.getMappedPort(5801), RestConstant.CONTEXT_PATH, CUSTOM_JOB_ID_1)); tasks.add(new Tuple3<>(server.getMappedPort(8080), "", CUSTOM_JOB_ID_2)); + + tasks.add( + new Tuple3<>( + server.getMappedPort(5801), + RestConstant.CONTEXT_PATH, + HOCON_CUSTOM_JOB_ID_1)); + + tasks.add(new Tuple3<>(server.getMappedPort(8080), "", HOCON_CUSTOM_JOB_ID_2)); } @Override @@ -706,6 +720,419 @@ public void testSubmitJobsV2() { }); } + @Test + public void testHoconSubmitJobWithCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName + "&jobId=" + task._3(), + true, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithoutCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testHoconSubmitJobWithoutCustomJobIdV2() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + submitHoconJobAndAssertResponse( + container, + task._1(), + task._2(), + i, + hoconParamJobName, + false, + task._3().toString()); + }); + } + + @Test + public void testHoconStartWithSavePointWithoutJobId() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + hoconJobName, + hoconParamJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testHoconStartWithSavePointWithoutJobIdV2() { + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + Response response = + submitHoconJob( + "BATCH", + container, + task._1(), + task._2(), + true, + hoconJobName, + hoconParamJobName); + response.then() + .statusCode(400) + .body( + "message", + equalTo( + "Please provide jobId when start with save point.")); + }); + } + + @Test + public void testHoconStopJob() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(2); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + i.getAndIncrement(); + }); + } + + @Test + public void testHoconStopJobV2() { + AtomicInteger i = new AtomicInteger(); + + Arrays.asList(server, secondServer) + .forEach( + container -> { + Tuple3 task = tasks.get(3); + String jobId = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + + String parameters = + "{" + + "\"jobId\":" + + jobId + + "," + + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/SAVEPOINT_DONE") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId))); + + String jobId2 = + submitHoconJob( + container, + task._1(), + task._2(), + "STREAMING", + hoconJobName, + hoconParamJobName) + .getBody() + .jsonPath() + .getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .RUNNING_JOB_URL + + "/" + + jobId2) + .then() + .statusCode(200) + .body("jobStatus", equalTo("RUNNING"))); + parameters = + "{" + + "\"jobId\":" + + jobId2 + + "," + + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container.getHost() + + colon + + task._1() + + task._2() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[" + i.get() + "].jobId", + equalTo(jobId2))); + + i.getAndIncrement(); + }); + } + private void submitJobs( String jobMode, GenericContainer container, @@ -1015,4 +1442,111 @@ private String getBaseRestUrl( + contextPath + RestConstant.FINISHED_JOBS_INFO; } + + private Response submitHoconJob( + GenericContainer container, + int port, + String contextPath, + String jobMode, + String jobName, + String paramJobName) { + return submitHoconJob(jobMode, container, port, contextPath, false, jobName, paramJobName); + } + + private Response submitHoconJob( + String jobMode, + GenericContainer container, + int port, + String contextPath, + boolean isStartWithSavePoint, + String jobName, + String paramJobName) { + String requestBody = + String.format( + "env {\n" + + " job.name = \"%s\"\n" + + " job.mode = \"%s\"\n" + + "}\n\n" + + "source {\n" + + " FakeSource {\n" + + " result_table_name = \"fake\"\n" + + " schema = {\n" + + " fields {\n" + + " name = \"string\"\n" + + " age = \"int\"\n" + + " card = \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n\n" + + "transform {\n" + + "}\n\n" + + "sink {\n" + + " Console {\n" + + " source_table_name = \"fake\"\n" + + " }\n" + + "}\n", + jobName, jobMode); + String parameters = null; + if (paramJobName != null) { + parameters = "jobName=" + paramJobName; + } + if (isStartWithSavePoint) { + parameters = parameters + "&isStartWithSavePoint=true"; + } + parameters = parameters + "&format=hocon"; + Response response = + given().body(requestBody) + .header("Content-Type", "text/plain; charset=utf-8") + .post( + parameters == null + ? http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + : http + + container.getHost() + + colon + + port + + contextPath + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + return response; + } + + private void submitHoconJobAndAssertResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam, + boolean isCustomJobId, + String customJobId) { + Response response = submitHoconJobAndResponse(container, port, contextPath, i, customParam); + String jobId = response.getBody().jsonPath().getString("jobId"); + assertResponse(container, port, contextPath, i, jobId, customJobId, isCustomJobId); + i.getAndIncrement(); + } + + private Response submitHoconJobAndResponse( + GenericContainer> container, + int port, + String contextPath, + AtomicInteger i, + String customParam) { + Response response = + i.get() == 0 + ? submitHoconJob( + container, port, contextPath, "BATCH", hoconJobName, customParam) + : submitHoconJob(container, port, contextPath, "BATCH", hoconJobName, null); + if (i.get() == 0) { + response.then().statusCode(200).body("jobName", equalTo(hoconParamJobName)); + } else { + response.then().statusCode(200).body("jobName", equalTo(hoconJobName)); + } + return response; + } }