Skip to content

Commit 3084561

Browse files
authored
[FLINK-36759][sql-gateway] Add REST API to deploy script in application mode (#25730)
1 parent f08e5ec commit 3084561

File tree

23 files changed

+1009
-106
lines changed

23 files changed

+1009
-106
lines changed

flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface ApplicationDeployer {
3434
* @param applicationConfiguration an {@link ApplicationConfiguration} specific to the
3535
* application to be executed.
3636
*/
37-
<ClusterID> void run(
37+
<ClusterID> ClusterID run(
3838
final Configuration configuration,
3939
final ApplicationConfiguration applicationConfiguration)
4040
throws Exception;

flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ApplicationClusterDeployer(final ClusterClientServiceLoader clientService
4848
this.clientServiceLoader = checkNotNull(clientServiceLoader);
4949
}
5050

51-
public <ClusterID> void run(
51+
public <ClusterID> ClusterID run(
5252
final Configuration configuration,
5353
final ApplicationConfiguration applicationConfiguration)
5454
throws Exception {
@@ -64,8 +64,10 @@ public <ClusterID> void run(
6464
final ClusterSpecification clusterSpecification =
6565
clientFactory.getClusterSpecification(configuration);
6666

67-
clusterDescriptor.deployApplicationCluster(
68-
clusterSpecification, applicationConfiguration);
67+
return clusterDescriptor
68+
.deployApplicationCluster(clusterSpecification, applicationConfiguration)
69+
.getClusterClient()
70+
.getClusterId();
6971
}
7072
}
7173
}

flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void setTriggerCheckpointFunction(
113113

114114
@Override
115115
public T getClusterId() {
116-
throw new UnsupportedOperationException();
116+
return (T) "test-cluster";
117117
}
118118

119119
@Override

flink-end-to-end-tests/run-nightly-tests.sh

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ function run_group_2 {
181181
run_test "Streaming SQL end-to-end test using planner loader" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
182182
run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions"
183183
run_test "Sql Jdbc Driver end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions"
184+
run_test "Run kubernetes SQL application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_sql_application.sh"
184185

185186
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
186187
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"

flink-end-to-end-tests/test-scripts/common.sh

+6
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,12 @@ function check_logs_for_non_empty_out_files {
533533

534534
function shutdown_all {
535535
stop_cluster
536+
stop_sql_gateway
536537
# stop TMs which started by command: bin/taskmanager.sh start
537538
"$FLINK_DIR"/bin/taskmanager.sh stop-all
538539
tm_kill_all
539540
jm_kill_all
541+
gw_kill_all
540542
}
541543

542544
function stop_cluster {
@@ -683,6 +685,10 @@ function tm_kill_all {
683685
kill_all 'TaskManagerRunner|TaskManager'
684686
}
685687

688+
function gw_kill_all {
689+
kill_all 'SqlGateway'
690+
}
691+
686692
# Kills all processes that match the given name.
687693
function kill_all {
688694
local pid=`jps | grep -E "${1}" | cut -d " " -f 1 || true`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
source "$(dirname "$0")"/common_kubernetes.sh
21+
22+
CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
23+
CLUSTER_ROLE_BINDING="flink-role-binding-default"
24+
CLUSTER_ID="flink-native-k8s-sql-application-1"
25+
FLINK_IMAGE_NAME="test_kubernetes_application-1"
26+
LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
27+
IMAGE_BUILD_RETRIES=3
28+
IMAGE_BUILD_BACKOFF=2
29+
30+
function internal_cleanup {
31+
kubectl delete deployment ${CLUSTER_ID}
32+
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
33+
}
34+
35+
start_kubernetes
36+
37+
if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
38+
echo "ERROR: Could not build image. Aborting..."
39+
exit 1
40+
fi
41+
42+
kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default
43+
44+
mkdir -p "$LOCAL_LOGS_PATH"
45+
46+
echo "[INFO] Start SQL Gateway"
47+
set_config_key "sql-gateway.endpoint.rest.address" "localhost"
48+
start_sql_gateway
49+
50+
echo "[INFO] Submit SQL job in Application Mode"
51+
SESSION_HANDLE=`curl --silent --request POST http://localhost:8083/sessions | sed -n 's/.*"sessionHandle":\s*"\([^"]*\)".*/\1/p'`
52+
curl --location --request POST http://localhost:8083/sessions/${SESSION_HANDLE}/scripts \
53+
--header 'Content-Type: application/json' \
54+
--data-raw '{
55+
"script": "CREATE TEMPORARY TABLE sink(a INT) WITH ( '\''connector'\'' = '\''blackhole'\''); INSERT INTO sink VALUES (1), (2), (3);",
56+
"executionConfig": {
57+
"execution.target": "kubernetes-application",
58+
"kubernetes.cluster-id": "'${CLUSTER_ID}'",
59+
"kubernetes.container.image.ref": "'${FLINK_IMAGE_NAME}'",
60+
"jobmanager.memory.process.size": "1088m",
61+
"taskmanager.memory.process.size": "1000m",
62+
"kubernetes.jobmanager.cpu": 0.5,
63+
"kubernetes.taskmanager.cpu": 0.5,
64+
"kubernetes.rest-service.exposed.type": "NodePort"
65+
}
66+
}'
67+
68+
echo ""
69+
echo "[INFO] Wait job finishes"
70+
kubectl wait --for=condition=Available --timeout=60s deploy/${CLUSTER_ID} || exit 1
71+
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
72+
wait_rest_endpoint_up_k8s $jm_pod_name
73+
74+
# The Flink cluster will be destroyed immediately once the job finished or failed. So we check jobmanager logs
75+
# instead of checking the result
76+
echo "[INFO] Check logs to verify job finishes"
77+
kubectl logs -f $jm_pod_name >$LOCAL_LOGS_PATH/jobmanager.log
78+
grep -E "Job [A-Za-z0-9]+ reached terminal state FINISHED" $LOCAL_LOGS_PATH/jobmanager.log

flink-table/flink-sql-client/src/test/resources/sql/set.q

+71-66
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,22 @@ reset 'table.resources.download-dir';
7474

7575
# list the configured configuration
7676
set;
77-
+-------------------------------------------------+-----------+
78-
| key | value |
79-
+-------------------------------------------------+-----------+
80-
| execution.attached | true |
81-
| execution.shutdown-on-attached-exit | false |
82-
| execution.state-recovery.claim-mode | NO_CLAIM |
83-
| execution.state-recovery.ignore-unclaimed-state | false |
84-
| execution.target | remote |
85-
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
86-
| rest.port | $VAR_REST_PORT |
87-
| sql-client.display.print-time-cost | false |
88-
| sql-client.execution.result-mode | tableau |
89-
| table.exec.legacy-cast-behaviour | DISABLED |
90-
+-------------------------------------------------+-----------+
91-
10 rows in set
77+
+-------------------------------------------------+--------------+
78+
| key | value |
79+
+-------------------------------------------------+--------------+
80+
| $internal.deployment.config-dir | /dummy/conf/ |
81+
| execution.attached | true |
82+
| execution.shutdown-on-attached-exit | false |
83+
| execution.state-recovery.claim-mode | NO_CLAIM |
84+
| execution.state-recovery.ignore-unclaimed-state | false |
85+
| execution.target | remote |
86+
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
87+
| rest.port | $VAR_REST_PORT |
88+
| sql-client.display.print-time-cost | false |
89+
| sql-client.execution.result-mode | tableau |
90+
| table.exec.legacy-cast-behaviour | DISABLED |
91+
+-------------------------------------------------+--------------+
92+
11 rows in set
9293
!ok
9394

9495
# reset the configuration
@@ -97,18 +98,19 @@ reset;
9798
!info
9899

99100
set;
100-
+-------------------------------------------------+-----------+
101-
| key | value |
102-
+-------------------------------------------------+-----------+
103-
| execution.attached | true |
104-
| execution.shutdown-on-attached-exit | false |
105-
| execution.state-recovery.claim-mode | NO_CLAIM |
106-
| execution.state-recovery.ignore-unclaimed-state | false |
107-
| execution.target | remote |
108-
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
109-
| rest.port | $VAR_REST_PORT |
110-
+-------------------------------------------------+-----------+
111-
7 rows in set
101+
+-------------------------------------------------+--------------+
102+
| key | value |
103+
+-------------------------------------------------+--------------+
104+
| $internal.deployment.config-dir | /dummy/conf/ |
105+
| execution.attached | true |
106+
| execution.shutdown-on-attached-exit | false |
107+
| execution.state-recovery.claim-mode | NO_CLAIM |
108+
| execution.state-recovery.ignore-unclaimed-state | false |
109+
| execution.target | remote |
110+
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
111+
| rest.port | $VAR_REST_PORT |
112+
+-------------------------------------------------+--------------+
113+
8 rows in set
112114
!ok
113115

114116
# should fail because default dialect doesn't support hive dialect
@@ -136,19 +138,20 @@ set 'sql-client.verbose' = 'true';
136138
!info
137139
138140
set;
139-
+-------------------------------------------------+-----------+
140-
| key | value |
141-
+-------------------------------------------------+-----------+
142-
| execution.attached | true |
143-
| execution.shutdown-on-attached-exit | false |
144-
| execution.state-recovery.claim-mode | NO_CLAIM |
145-
| execution.state-recovery.ignore-unclaimed-state | false |
146-
| execution.target | remote |
147-
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
148-
| rest.port | $VAR_REST_PORT |
149-
| sql-client.verbose | true |
150-
+-------------------------------------------------+-----------+
151-
8 rows in set
141+
+-------------------------------------------------+--------------+
142+
| key | value |
143+
+-------------------------------------------------+--------------+
144+
| $internal.deployment.config-dir | /dummy/conf/ |
145+
| execution.attached | true |
146+
| execution.shutdown-on-attached-exit | false |
147+
| execution.state-recovery.claim-mode | NO_CLAIM |
148+
| execution.state-recovery.ignore-unclaimed-state | false |
149+
| execution.target | remote |
150+
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
151+
| rest.port | $VAR_REST_PORT |
152+
| sql-client.verbose | true |
153+
+-------------------------------------------------+--------------+
154+
9 rows in set
152155
!ok
153156
154157
set 'execution.attached' = 'false';
@@ -160,19 +163,20 @@ reset 'execution.attached';
160163
!info
161164
162165
set;
163-
+-------------------------------------------------+-----------+
164-
| key | value |
165-
+-------------------------------------------------+-----------+
166-
| execution.attached | true |
167-
| execution.shutdown-on-attached-exit | false |
168-
| execution.state-recovery.claim-mode | NO_CLAIM |
169-
| execution.state-recovery.ignore-unclaimed-state | false |
170-
| execution.target | remote |
171-
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
172-
| rest.port | $VAR_REST_PORT |
173-
| sql-client.verbose | true |
174-
+-------------------------------------------------+-----------+
175-
8 rows in set
166+
+-------------------------------------------------+--------------+
167+
| key | value |
168+
+-------------------------------------------------+--------------+
169+
| $internal.deployment.config-dir | /dummy/conf/ |
170+
| execution.attached | true |
171+
| execution.shutdown-on-attached-exit | false |
172+
| execution.state-recovery.claim-mode | NO_CLAIM |
173+
| execution.state-recovery.ignore-unclaimed-state | false |
174+
| execution.target | remote |
175+
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
176+
| rest.port | $VAR_REST_PORT |
177+
| sql-client.verbose | true |
178+
+-------------------------------------------------+--------------+
179+
9 rows in set
176180
!ok
177181
178182
# test reset can work with add jar
@@ -190,19 +194,20 @@ SHOW JARS;
190194
!ok
191195
192196
set;
193-
+-------------------------------------------------+-----------+
194-
| key | value |
195-
+-------------------------------------------------+-----------+
196-
| execution.attached | true |
197-
| execution.shutdown-on-attached-exit | false |
198-
| execution.state-recovery.claim-mode | NO_CLAIM |
199-
| execution.state-recovery.ignore-unclaimed-state | false |
200-
| execution.target | remote |
201-
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
202-
| rest.port | $VAR_REST_PORT |
203-
| sql-client.verbose | true |
204-
+-------------------------------------------------+-----------+
205-
8 rows in set
197+
+-------------------------------------------------+--------------+
198+
| key | value |
199+
+-------------------------------------------------+--------------+
200+
| $internal.deployment.config-dir | /dummy/conf/ |
201+
| execution.attached | true |
202+
| execution.shutdown-on-attached-exit | false |
203+
| execution.state-recovery.claim-mode | NO_CLAIM |
204+
| execution.state-recovery.ignore-unclaimed-state | false |
205+
| execution.target | remote |
206+
| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS |
207+
| rest.port | $VAR_REST_PORT |
208+
| sql-client.verbose | true |
209+
+-------------------------------------------------+--------------+
210+
9 rows in set
206211
!ok
207212
208213
reset;

flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java

+21
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import javax.annotation.Nullable;
4343

44+
import java.net.URI;
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Set;
@@ -347,4 +348,24 @@ OperationHandle refreshMaterializedTable(
347348
Map<String, String> dynamicOptions,
348349
Map<String, String> staticPartitions,
349350
Map<String, String> executionConfig);
351+
352+
// -------------------------------------------------------------------------------------------
353+
// Deploy Script
354+
// -------------------------------------------------------------------------------------------
355+
356+
/**
357+
* Deploy the script in application mode.
358+
*
359+
* @param sessionHandle handle to identify the session.
360+
* @param scriptUri URI of the script.
361+
* @param script the content of the script.
362+
* @param executionConfig to run the script.
363+
* @return the cluster description.
364+
*/
365+
<ClusterID> ClusterID deployScript(
366+
SessionHandle sessionHandle,
367+
@Nullable URI scriptUri,
368+
@Nullable String script,
369+
Configuration executionConfig)
370+
throws SqlGatewayException;
350371
}

flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java

+11
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import javax.annotation.Nullable;
4141

42+
import java.net.URI;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Set;
@@ -201,6 +202,16 @@ public OperationHandle refreshMaterializedTable(
201202
throw new UnsupportedOperationException();
202203
}
203204

205+
@Override
206+
public <ClusterID> ClusterID deployScript(
207+
SessionHandle sessionHandle,
208+
@Nullable URI scriptUri,
209+
@Nullable String script,
210+
Configuration executionConfig)
211+
throws SqlGatewayException {
212+
throw new UnsupportedOperationException();
213+
}
214+
204215
@Override
205216
public ResolvedCatalogBaseTable<?> getTable(
206217
SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)

0 commit comments

Comments
 (0)