Skip to content

Commit 4a8b8c0

Browse files
authored
[FLINK-36760][sql-client] Supports to deploy script via sql client (#25754)
1 parent 7344916 commit 4a8b8c0

File tree

27 files changed

+648
-181
lines changed

27 files changed

+648
-181
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java

+47-31
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
public enum PackagedProgramUtils {
5454
;
5555

56+
private static final String SQL_DRIVER_CLASS_NAME =
57+
"org.apache.flink.table.runtime.application.SqlDriver";
58+
5659
private static final String PYTHON_GATEWAY_CLASS_NAME =
5760
"org.apache.flink.client.python.PythonGatewayServer";
5861

@@ -193,43 +196,21 @@ public static boolean isPython(String[] programArguments) {
193196
}
194197

195198
public static URL getPythonJar() {
196-
String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR);
197-
final List<Path> pythonJarPath = new ArrayList<>();
198-
try {
199-
Files.walkFileTree(
200-
FileSystems.getDefault().getPath(flinkOptPath),
201-
new SimpleFileVisitor<Path>() {
202-
@Override
203-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
204-
throws IOException {
205-
FileVisitResult result = super.visitFile(file, attrs);
206-
if (file.getFileName().toString().startsWith("flink-python")) {
207-
pythonJarPath.add(file);
208-
}
209-
return result;
210-
}
211-
});
212-
} catch (IOException e) {
213-
throw new RuntimeException(
214-
"Exception encountered during finding the flink-python jar. This should not happen.",
215-
e);
216-
}
217-
218-
if (pythonJarPath.size() != 1) {
219-
throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar.");
220-
}
221-
222-
try {
223-
return pythonJarPath.get(0).toUri().toURL();
224-
} catch (MalformedURLException e) {
225-
throw new RuntimeException("URL is invalid. This should not happen.", e);
226-
}
199+
return getOptJar("flink-python");
227200
}
228201

229202
public static String getPythonDriverClassName() {
230203
return PYTHON_DRIVER_CLASS_NAME;
231204
}
232205

206+
public static boolean isSqlApplication(String entryPointClassName) {
207+
return (entryPointClassName != null) && (entryPointClassName.equals(SQL_DRIVER_CLASS_NAME));
208+
}
209+
210+
public static URL getSqlGatewayJar() {
211+
return getOptJar("flink-sql-gateway");
212+
}
213+
233214
public static URI resolveURI(String path) throws URISyntaxException {
234215
final URI uri = new URI(path);
235216
if (uri.getScheme() != null) {
@@ -260,4 +241,39 @@ private static ProgramInvocationException generateException(
260241
stderr.length() == 0 ? "(none)" : stderr),
261242
cause);
262243
}
244+
245+
private static URL getOptJar(String jarName) {
246+
String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR);
247+
final List<Path> optJarPath = new ArrayList<>();
248+
try {
249+
Files.walkFileTree(
250+
FileSystems.getDefault().getPath(flinkOptPath),
251+
new SimpleFileVisitor<Path>() {
252+
@Override
253+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
254+
throws IOException {
255+
FileVisitResult result = super.visitFile(file, attrs);
256+
if (file.getFileName().toString().startsWith(jarName)) {
257+
optJarPath.add(file);
258+
}
259+
return result;
260+
}
261+
});
262+
} catch (IOException e) {
263+
throw new RuntimeException(
264+
"Exception encountered during finding the flink-python jar. This should not happen.",
265+
e);
266+
}
267+
268+
if (optJarPath.size() != 1) {
269+
throw new RuntimeException(
270+
String.format("Found " + optJarPath.size() + " %s jar.", jarName));
271+
}
272+
273+
try {
274+
return optJarPath.get(0).toUri().toURL();
275+
} catch (MalformedURLException e) {
276+
throw new RuntimeException("URL is invalid. This should not happen.", e);
277+
}
278+
}
263279
}

flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.junit.Test;
3030

31+
import java.net.URI;
3132
import java.time.Duration;
3233
import java.util.Arrays;
3334
import java.util.Collections;
@@ -69,11 +70,13 @@ protected List<String> formatRawResult(List<String> rawResult) {
6970
}
7071

7172
@Override
72-
protected void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
73+
protected void executeSqlStatements(
74+
ClusterController clusterController, List<String> sqlLines, List<URI> dependencies)
7375
throws Exception {
7476
clusterController.submitSQLJob(
7577
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
7678
.addJar(SQL_TOOL_BOX_JAR)
79+
.addJars(dependencies.toArray(new URI[0]))
7780
.build(),
7881
Duration.ofMinutes(2L));
7982
}

flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.io.File;
3737
import java.io.FileNotFoundException;
3838
import java.io.IOException;
39+
import java.net.URI;
3940
import java.nio.file.Path;
4041
import java.time.Duration;
4142
import java.util.List;
@@ -91,10 +92,12 @@ protected void destroyHDFS() {
9192
}
9293

9394
@Override
94-
protected void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
95+
protected void executeSqlStatements(
96+
ClusterController clusterController, List<String> sqlLines, List<URI> dependencies)
9597
throws Exception {
9698
clusterController.submitSQLJob(
9799
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
100+
.addJars(dependencies.toArray(new URI[0]))
98101
.setEnvProcessor(
99102
map -> map.put("HADOOP_CLASSPATH", getHadoopClassPathContent()))
100103
.build(),

flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.junit.Test;
3030

31+
import java.net.URI;
3132
import java.time.Duration;
3233
import java.util.Arrays;
3334
import java.util.Collections;
@@ -76,11 +77,13 @@ protected List<String> formatRawResult(List<String> rawResult) {
7677
}
7778

7879
@Override
79-
protected void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
80+
protected void executeSqlStatements(
81+
ClusterController clusterController, List<String> sqlLines, List<URI> dependencies)
8082
throws Exception {
8183
clusterController.submitSQLJob(
8284
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
8385
.addJar(SQL_TOOL_BOX_JAR)
86+
.addJars(dependencies.toArray(new URI[0]))
8487
.build(),
8588
Duration.ofMinutes(2L));
8689
}

flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.io.File;
5050
import java.io.FileNotFoundException;
5151
import java.io.IOException;
52+
import java.net.URI;
5253
import java.net.URL;
5354
import java.nio.file.Files;
5455
import java.nio.file.Path;
@@ -125,23 +126,25 @@ public void runAndCheckSQL(
125126
runAndCheckSQL(
126127
sqlPath,
127128
Collections.singletonMap(result, resultItems),
128-
Collections.singletonMap(result, formatter));
129+
Collections.singletonMap(result, formatter),
130+
Collections.emptyList());
129131
}
130132

131133
public void runAndCheckSQL(String sqlPath, Map<Path, List<String>> resultItems)
132134
throws Exception {
133-
runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
135+
runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), Collections.emptyList());
134136
}
135137

136138
public void runAndCheckSQL(
137139
String sqlPath,
138140
Map<Path, List<String>> resultItems,
139-
Map<Path, Function<List<String>, List<String>>> formatters)
141+
Map<Path, Function<List<String>, List<String>>> formatters,
142+
List<URI> dependencies)
140143
throws Exception {
141144
try (ClusterController clusterController = flink.startCluster(1)) {
142145
List<String> sqlLines = initializeSqlLines(sqlPath);
143146

144-
executeSqlStatements(clusterController, sqlLines);
147+
executeSqlStatements(clusterController, sqlLines, dependencies);
145148

146149
// Wait until all the results flushed to the json file.
147150
LOG.info("Verify the result.");
@@ -163,7 +166,8 @@ protected Map<String, String> generateReplaceVars() {
163166
}
164167

165168
protected abstract void executeSqlStatements(
166-
ClusterController clusterController, List<String> sqlLines) throws Exception;
169+
ClusterController clusterController, List<String> sqlLines, List<URI> dependencies)
170+
throws Exception;
167171

168172
private List<String> initializeSqlLines(String sqlPath) throws IOException {
169173
URL url = SqlITCaseBase.class.getClassLoader().getResource(sqlPath);

flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java

+20
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.rules.TestName;
3131

3232
import java.io.IOException;
33+
import java.net.URI;
3334
import java.util.Arrays;
3435
import java.util.Collections;
3536
import java.util.Map;
@@ -90,6 +91,25 @@ public void testUdfInRemoteJar() throws Exception {
9091
raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA));
9192
}
9293

94+
@Test
95+
public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception {
96+
runAndCheckSQL(
97+
"sql_client_remote_jar_e2e.sql",
98+
Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")),
99+
Collections.singletonMap(
100+
result,
101+
raw ->
102+
convertToMaterializedResult(
103+
raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)),
104+
Collections.singletonList(
105+
URI.create(
106+
String.format(
107+
"hdfs://%s:%s/%s",
108+
hdfsCluster.getURI().getHost(),
109+
hdfsCluster.getNameNodePort(),
110+
hdPath))));
111+
}
112+
93113
@Test
94114
public void testScalarUdfWhenCheckpointEnable() throws Exception {
95115
runAndCheckSQL(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
CREATE TABLE JsonTable (
20+
user_name STRING,
21+
order_cnt BIGINT
22+
) WITH (
23+
'connector' = 'filesystem',
24+
'path' = '$RESULT',
25+
'sink.rolling-policy.rollover-interval' = '2s',
26+
'sink.rolling-policy.check-interval' = '2s',
27+
'format' = 'debezium-json'
28+
);
29+
30+
create function count_agg as 'org.apache.flink.table.toolbox.CountAggFunction' LANGUAGE JAVA;
31+
32+
SET execution.runtime-mode = $MODE;
33+
SET table.exec.mini-batch.enabled = true;
34+
SET table.exec.mini-batch.size = 5;
35+
SET table.exec.mini-batch.allow-latency = 2s;
36+
37+
INSERT INTO JsonTable
38+
SELECT user_name, count_agg(order_id)
39+
FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
40+
GROUP BY user_name;

flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java

+7-20
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626
import org.apache.flink.table.client.gateway.DefaultContextUtils;
2727
import org.apache.flink.table.client.gateway.Executor;
2828
import org.apache.flink.table.client.gateway.SingleSessionManager;
29-
import org.apache.flink.table.client.gateway.SqlExecutionException;
3029
import org.apache.flink.table.gateway.SqlGateway;
3130
import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory;
3231
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
3332
import org.apache.flink.table.gateway.service.context.DefaultContext;
3433
import org.apache.flink.util.NetUtils;
3534

36-
import org.apache.commons.io.IOUtils;
3735
import org.apache.commons.lang3.SystemUtils;
3836
import org.jline.terminal.Terminal;
3937
import org.slf4j.Logger;
@@ -42,16 +40,14 @@
4240
import javax.annotation.Nullable;
4341

4442
import java.io.Closeable;
45-
import java.io.IOException;
4643
import java.net.InetSocketAddress;
47-
import java.net.URL;
48-
import java.nio.charset.StandardCharsets;
4944
import java.nio.file.Path;
5045
import java.nio.file.Paths;
5146
import java.util.Arrays;
5247
import java.util.function.Supplier;
5348

5449
import static org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
50+
import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode;
5551
import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getSqlGatewayOptionPrefix;
5652

5753
/**
@@ -140,7 +136,11 @@ private void openCli(Executor executor) {
140136

141137
try (CliClient cli = new CliClient(terminalFactory, executor, historyFilePath)) {
142138
if (options.getInitFile() != null) {
143-
boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));
139+
if (isApplicationMode(executor.getSessionConfig())) {
140+
throw new SqlClientException(
141+
"Sql Client doesn't support to run init files when deploying script into cluster.");
142+
}
143+
boolean success = cli.executeInitialization(options.getInitFile());
144144
if (!success) {
145145
System.out.println(
146146
String.format(
@@ -158,7 +158,7 @@ private void openCli(Executor executor) {
158158
if (!hasSqlFile) {
159159
cli.executeInInteractiveMode();
160160
} else {
161-
cli.executeInNonInteractiveMode(readExecutionContent());
161+
cli.executeInNonInteractiveMode(options.getSqlFile());
162162
}
163163
}
164164
}
@@ -320,17 +320,4 @@ public void run() {
320320
System.out.println("done.");
321321
}
322322
}
323-
324-
private String readExecutionContent() {
325-
return readFromURL(options.getSqlFile());
326-
}
327-
328-
private String readFromURL(URL file) {
329-
try {
330-
return IOUtils.toString(file, StandardCharsets.UTF_8);
331-
} catch (IOException e) {
332-
throw new SqlExecutionException(
333-
String.format("Fail to read content from the %s.", file.getPath()), e);
334-
}
335-
}
336323
}

0 commit comments

Comments
 (0)