Skip to content

Commit b700f1c

Browse files
authored
[FLINK-36758][sql-gateway] Introduce driver to load the application in application mode (#25724)
1 parent 4952b58 commit b700f1c

File tree

5 files changed

+664
-1
lines changed

5 files changed

+664
-1
lines changed

flink-core/src/main/java/org/apache/flink/util/FileUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public static byte[] readAllBytes(java.nio.file.Path path) throws IOException {
193193
* @throws IOException if an I/O error occurs reading from the stream
194194
* @throws OutOfMemoryError if an array of the required size cannot be allocated
195195
*/
196-
private static byte[] read(InputStream source, int initialSize) throws IOException {
196+
public static byte[] read(InputStream source, int initialSize) throws IOException {
197197
int capacity = initialSize;
198198
byte[] buf = new byte[capacity];
199199
int nread = 0;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.gateway.service.application;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
25+
import org.apache.flink.table.gateway.api.session.SessionHandle;
26+
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
27+
import org.apache.flink.table.gateway.service.context.DefaultContext;
28+
import org.apache.flink.table.gateway.service.context.SessionContext;
29+
import org.apache.flink.util.concurrent.Executors;
30+
31+
import java.io.OutputStream;
32+
import java.util.Collections;
33+
import java.util.UUID;
34+
35+
/** Runner to run the script. It prepares the required dependencies and environment. */
36+
public class ScriptRunner {
37+
38+
private static final SessionHandle SESSION_HANDLE =
39+
new SessionHandle(UUID.fromString("013059f8-760f-4390-b74d-d0818bd99365"));
40+
41+
public static void run(String script) throws Exception {
42+
run(script, System.out);
43+
}
44+
45+
@VisibleForTesting
46+
public static void run(String script, OutputStream outputStream) throws Exception {
47+
DefaultContext defaultContext =
48+
DefaultContext.load(
49+
(Configuration)
50+
StreamExecutionEnvironment.getExecutionEnvironment(
51+
new Configuration())
52+
.getConfiguration(),
53+
Collections.emptyList(),
54+
false);
55+
SessionContext sessionContext =
56+
SessionContext.create(
57+
defaultContext,
58+
SESSION_HANDLE,
59+
SessionEnvironment.newBuilder()
60+
.setSessionEndpointVersion(
61+
SqlGatewayRestAPIVersion.getDefaultVersion())
62+
.build(),
63+
Executors.newDirectExecutorService());
64+
try (AutoCloseable ignore = sessionContext::close) {
65+
new ScriptExecutor(sessionContext, new Printer(outputStream)).execute(script);
66+
}
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.gateway.service.application;
20+
21+
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.common.JobStatus;
23+
import org.apache.flink.client.cli.ExecutionConfigAccessor;
24+
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
25+
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
26+
import org.apache.flink.client.program.PackagedProgram;
27+
import org.apache.flink.configuration.Configuration;
28+
import org.apache.flink.configuration.DeploymentOptions;
29+
import org.apache.flink.configuration.PipelineOptionsInternal;
30+
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
31+
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
32+
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
33+
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
34+
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
35+
import org.apache.flink.runtime.minicluster.MiniCluster;
36+
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
37+
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
38+
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
39+
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
40+
import org.apache.flink.runtime.testutils.CommonTestUtils;
41+
import org.apache.flink.table.gateway.service.utils.MockHttpServer;
42+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
43+
import org.apache.flink.table.runtime.application.SqlDriver;
44+
import org.apache.flink.util.ExceptionUtils;
45+
import org.apache.flink.util.FileUtils;
46+
import org.apache.flink.util.UserClassLoaderJarTestUtils;
47+
48+
import org.junit.jupiter.api.AfterAll;
49+
import org.junit.jupiter.api.AfterEach;
50+
import org.junit.jupiter.api.BeforeAll;
51+
import org.junit.jupiter.api.BeforeEach;
52+
import org.junit.jupiter.api.Test;
53+
import org.junit.jupiter.api.io.TempDir;
54+
55+
import java.io.ByteArrayOutputStream;
56+
import java.io.File;
57+
import java.io.IOException;
58+
import java.io.OutputStream;
59+
import java.net.URL;
60+
import java.nio.file.Files;
61+
import java.nio.file.Path;
62+
import java.util.Arrays;
63+
import java.util.HashMap;
64+
import java.util.List;
65+
import java.util.Map;
66+
import java.util.concurrent.ExecutionException;
67+
import java.util.function.Supplier;
68+
69+
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
70+
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
71+
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
72+
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
73+
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
74+
import static org.assertj.core.api.Assertions.assertThat;
75+
76+
/** ITCase to verify {@link ScriptRunner}. */
77+
class ScriptRunnerITCase {
78+
79+
private static Map<String, String> originalEnv;
80+
private static File udfJar;
81+
82+
private OutputStream outputStream;
83+
84+
@BeforeAll
85+
static void beforeAll(@TempDir File flinkHome, @TempDir Path functionHome) throws Exception {
86+
originalEnv = System.getenv();
87+
// prepare yaml
88+
File confYaml = new File(flinkHome, "config.yaml");
89+
if (!confYaml.createNewFile()) {
90+
throw new IOException("Can't create testing config.yaml file.");
91+
}
92+
Map<String, String> map = new HashMap<>(System.getenv());
93+
map.put(ENV_FLINK_CONF_DIR, flinkHome.getAbsolutePath());
94+
org.apache.flink.core.testutils.CommonTestUtils.setEnv(map);
95+
96+
Map<String, String> classNameCodes = new HashMap<>();
97+
classNameCodes.put(
98+
GENERATED_LOWER_UDF_CLASS,
99+
String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
100+
classNameCodes.put(
101+
GENERATED_UPPER_UDF_CLASS,
102+
String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
103+
104+
udfJar =
105+
UserClassLoaderJarTestUtils.createJarFile(
106+
Files.createTempDirectory(functionHome, "test-jar").toFile(),
107+
"test-classloader-udf.jar",
108+
classNameCodes);
109+
}
110+
111+
@BeforeEach
112+
void beforeEach() {
113+
outputStream = new ByteArrayOutputStream(1024);
114+
SqlDriver.enableTestMode(outputStream);
115+
}
116+
117+
@AfterEach
118+
void afterEach() throws Exception {
119+
outputStream.close();
120+
SqlDriver.disableTestMode();
121+
}
122+
123+
@AfterAll
124+
static void afterAll() {
125+
org.apache.flink.core.testutils.CommonTestUtils.setEnv(originalEnv);
126+
}
127+
128+
@Test
129+
void testRunScriptFromFile(@TempDir Path workDir) throws Exception {
130+
String script =
131+
String.format(
132+
"CREATE TEMPORARY TABLE sink(\n"
133+
+ " a STRING\n"
134+
+ ") WITH (\n"
135+
+ " 'connector' = 'values'\n"
136+
+ ");\n"
137+
+ "ADD JAR '%s';\n"
138+
+ "CREATE TEMPORARY FUNCTION lower_func AS '%s';\n"
139+
+ "CREATE TEMPORARY VIEW v(c) AS VALUES ('A'), ('B'), ('C');\n"
140+
+ "INSERT INTO sink SELECT lower_func(c) FROM v;",
141+
udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS);
142+
143+
List<String> arguments =
144+
Arrays.asList("--scriptPath", createStatementFile(workDir, script).toString());
145+
runScriptInCluster(arguments);
146+
147+
assertThat(TestValuesTableFactory.getResultsAsStrings("sink"))
148+
.containsExactly("+I[a]", "+I[b]", "+I[c]");
149+
}
150+
151+
@Test
152+
void testRunScriptFromRemoteFile(@TempDir Path workDir) throws Exception {
153+
String script =
154+
String.format(
155+
"CREATE TEMPORARY TABLE sink(\n"
156+
+ " a STRING\n"
157+
+ ") WITH (\n"
158+
+ " 'connector' = 'values'\n"
159+
+ ");\n"
160+
+ "ADD JAR '%s';\n"
161+
+ "CREATE TEMPORARY FUNCTION lower_func AS '%s';\n"
162+
+ "CREATE TEMPORARY VIEW v(c) AS VALUES ('A'), ('B'), ('C');\n"
163+
+ "INSERT INTO sink SELECT lower_func(c) FROM v;",
164+
udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS);
165+
File file = createStatementFile(workDir, script).toFile();
166+
167+
try (MockHttpServer server = MockHttpServer.startHttpServer()) {
168+
URL url = server.prepareResource("/download/script.sql", file);
169+
List<String> arguments = Arrays.asList("--scriptPath", url.toString());
170+
runScriptInCluster(arguments);
171+
}
172+
}
173+
174+
@Test
175+
void testRunScript() throws Exception {
176+
List<String> arguments =
177+
Arrays.asList(
178+
"--script",
179+
String.format(
180+
"CREATE TEMPORARY TABLE sink(\n"
181+
+ " a STRING\n"
182+
+ ") WITH (\n"
183+
+ " 'connector' = 'values'\n"
184+
+ ");\n"
185+
+ "CREATE TEMPORARY FUNCTION upper_func AS '%s' USING JAR '%s';\n"
186+
+ "CREATE TEMPORARY VIEW v(c) AS VALUES ('a'), ('b'), ('c');\n"
187+
+ "INSERT INTO sink SELECT upper_func(c) FROM v;",
188+
GENERATED_UPPER_UDF_CLASS, udfJar.getAbsolutePath()));
189+
runScriptInCluster(arguments);
190+
191+
assertThat(TestValuesTableFactory.getResultsAsStrings("sink"))
192+
.containsExactly("+I[A]", "+I[B]", "+I[C]");
193+
}
194+
195+
void runScriptInCluster(List<String> arguments) throws Exception {
196+
JobID jobID = JobID.generate();
197+
final Configuration configuration = new Configuration();
198+
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
199+
configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, false);
200+
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
201+
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
202+
203+
final TestingMiniClusterConfiguration clusterConfiguration =
204+
TestingMiniClusterConfiguration.newBuilder()
205+
.setConfiguration(configuration)
206+
.build();
207+
208+
PackagedProgram.Builder builder =
209+
PackagedProgram.newBuilder()
210+
.setEntryPointClassName(SqlDriver.class.getName())
211+
.setArguments(arguments.toArray(new String[0]))
212+
.setUserClassPaths(
213+
ExecutionConfigAccessor.fromConfiguration(configuration)
214+
.getClasspaths());
215+
final TestingMiniCluster.Builder clusterBuilder =
216+
TestingMiniCluster.newBuilder(clusterConfiguration)
217+
.setDispatcherResourceManagerComponentFactorySupplier(
218+
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
219+
clusterConfiguration.getConfiguration(), builder.build()));
220+
221+
try (final MiniCluster cluster = clusterBuilder.build()) {
222+
223+
// start mini cluster and submit the job
224+
cluster.start();
225+
226+
// wait job finishes.
227+
awaitJobStatus(cluster, jobID, JobStatus.FINISHED);
228+
}
229+
}
230+
231+
private static Path createStatementFile(Path workDir, String script) throws Exception {
232+
File file = new File(workDir.toString(), "statement.sql");
233+
assertThat(file.createNewFile()).isTrue();
234+
FileUtils.writeFileUtf8(file, script);
235+
return file.toPath();
236+
}
237+
238+
private static Supplier<DispatcherResourceManagerComponentFactory>
239+
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
240+
Configuration configuration, PackagedProgram program) {
241+
return () -> {
242+
final ApplicationDispatcherLeaderProcessFactoryFactory
243+
applicationDispatcherLeaderProcessFactoryFactory =
244+
ApplicationDispatcherLeaderProcessFactoryFactory.create(
245+
new Configuration(configuration),
246+
SessionDispatcherFactory.INSTANCE,
247+
program);
248+
return new DefaultDispatcherResourceManagerComponentFactory(
249+
new DefaultDispatcherRunnerFactory(
250+
applicationDispatcherLeaderProcessFactoryFactory),
251+
StandaloneResourceManagerFactory.getInstance(),
252+
JobRestEndpointFactory.INSTANCE);
253+
};
254+
}
255+
256+
private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status)
257+
throws Exception {
258+
CommonTestUtils.waitUntilCondition(
259+
() -> {
260+
try {
261+
return cluster.getJobStatus(jobId).get() == status;
262+
} catch (ExecutionException e) {
263+
if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class)
264+
.isPresent()) {
265+
// job may not be yet submitted
266+
return false;
267+
}
268+
throw e;
269+
}
270+
},
271+
500,
272+
60);
273+
}
274+
}

0 commit comments

Comments
 (0)