Skip to content

Commit b1c3209

Browse files
[FLINK-37505][python] Add pyflink YAML based config support
1 parent 1d44789 commit b1c3209

File tree

6 files changed

+67
-24
lines changed

6 files changed

+67
-24
lines changed

flink-python/pyflink/common/configuration.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,20 @@ def set_string(self, key: str, value: str) -> 'Configuration':
6969
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
7070
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
7171
if key in [jars_key, classpaths_key]:
72-
jar_urls = Configuration.parse_jars_value(value, jvm)
72+
jar_urls = Configuration.parse_list_value(value)
7373
add_jars_to_context_class_loader(jar_urls)
7474
self._j_configuration.setString(key, value)
7575
return self
7676

7777
@staticmethod
78-
def parse_jars_value(value: str, jvm):
78+
def parse_list_value(value: str):
79+
if not value:
80+
return []
7981
from ruamel.yaml import YAML
8082
yaml = YAML(typ='safe')
81-
jar_urls_list = yaml.load(value)
82-
if isinstance(jar_urls_list, list):
83-
return jar_urls_list
83+
value_list = yaml.load(value)
84+
if isinstance(value_list, list):
85+
return value_list
8486
return value.split(";")
8587

8688
def get_integer(self, key: str, default_value: int) -> int:

flink-python/pyflink/common/tests/test_configuration.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from copy import deepcopy
1919

2020
from pyflink.common import Configuration
21-
from pyflink.java_gateway import get_gateway
2221
from pyflink.testing.test_case_utils import PyFlinkTestCase
2322

2423

@@ -165,16 +164,27 @@ def test_hash_equal_str(self):
165164

166165
self.assertEqual(str(conf), "{k1=v1, k2=1}")
167166

168-
def test_parse_jars_value(self):
169-
jvm = get_gateway().jvm
167+
def test_parse_list_value(self):
168+
# test None
169+
value = None
170+
expected_result = []
171+
result = Configuration.parse_list_value(value)
172+
self.assertEqual(result, expected_result)
173+
170174
# test parse YAML list
175+
value = "[jar1, jar2, jar3]"
176+
expected_result = ['jar1', 'jar2', 'jar3']
177+
result = Configuration.parse_list_value(value)
178+
self.assertEqual(result, expected_result)
179+
180+
# test parse multiline YAML list
171181
value = "- jar1\n- jar2\n- jar3"
172182
expected_result = ['jar1', 'jar2', 'jar3']
173-
result = Configuration.parse_jars_value(value, jvm)
183+
result = Configuration.parse_list_value(value)
174184
self.assertEqual(result, expected_result)
175185

176186
# test parse legacy pattern
177187
value = "jar1;jar2;jar3"
178188
expected_result = ['jar1', 'jar2', 'jar3']
179-
result = Configuration.parse_jars_value(value, jvm)
189+
result = Configuration.parse_list_value(value)
180190
self.assertEqual(result, expected_result)

flink-python/pyflink/datastream/stream_execution_environment.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -566,11 +566,10 @@ def add_jars(self, *jars_path: str):
566566
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
567567
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
568568
.getEnvironmentConfig(self._j_stream_execution_environment)
569-
old_jar_paths = env_config.getString(jars_key, None)
570-
joined_jars_path = ';'.join(jars_path)
571-
if old_jar_paths and old_jar_paths.strip():
572-
joined_jars_path = ';'.join([old_jar_paths, joined_jars_path])
573-
env_config.setString(jars_key, joined_jars_path)
569+
old_jars_path = env_config.getString(jars_key, None)
570+
old_jars_list = Configuration.parse_list_value(old_jars_path)
571+
joined_jars_list = [*old_jars_list, *jars_path]
572+
env_config.setString(jars_key, str(joined_jars_list))
574573

575574
def add_classpaths(self, *classpaths: str):
576575
"""
@@ -585,10 +584,9 @@ def add_classpaths(self, *classpaths: str):
585584
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
586585
.getEnvironmentConfig(self._j_stream_execution_environment)
587586
old_classpaths = env_config.getString(classpaths_key, None)
588-
joined_classpaths = ';'.join(list(classpaths))
589-
if old_classpaths and old_classpaths.strip():
590-
joined_classpaths = ';'.join([old_classpaths, joined_classpaths])
591-
env_config.setString(classpaths_key, joined_classpaths)
587+
old_classpaths_list = Configuration.parse_list_value(old_classpaths)
588+
joined_classpaths_list = [*old_classpaths_list, *classpaths]
589+
env_config.setString(classpaths_key, str(joined_classpaths_list))
592590

593591
def get_default_local_parallelism(self) -> int:
594592
"""

flink-python/pyflink/datastream/tests/test_stream_execution_environment.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,40 @@ def check_python_exec(i):
469469
expected.sort()
470470
self.assertEqual(expected, result)
471471

472+
def test_add_jars_basic(self):
473+
jvm = get_gateway().jvm
474+
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
475+
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
476+
.getEnvironmentConfig(self.env._j_stream_execution_environment)
477+
478+
old_jars = env_config.getString(jars_key, None)
479+
self.assertIsNone(old_jars)
480+
481+
self.env.add_jars('file://1.jar')
482+
new_jars = env_config.getString(jars_key, None)
483+
self.assertEqual(new_jars, '[\'file://1.jar\']')
484+
485+
self.env.add_jars('file://2.jar', 'file://3.jar')
486+
new_jars = env_config.getString(jars_key, None)
487+
self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']')
488+
489+
def test_add_classpaths_basic(self):
490+
jvm = get_gateway().jvm
491+
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
492+
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
493+
.getEnvironmentConfig(self.env._j_stream_execution_environment)
494+
495+
old_classpaths = env_config.getString(classpaths_key, None)
496+
self.assertIsNone(old_classpaths)
497+
498+
self.env.add_classpaths('file://1.jar')
499+
new_classpaths = env_config.getString(classpaths_key, None)
500+
self.assertEqual(new_classpaths, '[\'file://1.jar\']')
501+
502+
self.env.add_classpaths('file://2.jar', 'file://3.jar')
503+
new_classpaths = env_config.getString(classpaths_key, None)
504+
self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\', \'file://3.jar\']')
505+
472506
@unittest.skip("Disable due to Kafka connector need to release a new version 2.0")
473507
def test_add_jars(self):
474508
# find kafka connector jars

flink-python/pyflink/table/table_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def set(self, key: str, value: str) -> 'TableConfig':
106106
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
107107
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
108108
if key in [jars_key, classpaths_key]:
109-
jar_urls = Configuration.parse_jars_value(value, jvm)
109+
jar_urls = Configuration.parse_list_value(value)
110110
add_jars_to_context_class_loader(jar_urls)
111111
return self
112112

flink-python/pyflink/table/table_environment.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,17 +1486,16 @@ def _add_jars_to_j_env_config(self, config_key):
14861486
if jar_urls:
14871487
jvm = get_gateway().jvm
14881488
jar_urls_list = []
1489-
parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm)
1489+
parsed_jar_urls = Configuration.parse_list_value(jar_urls)
14901490
url_strings = [
14911491
jvm.java.net.URL(url).toString() if url else ""
14921492
for url in parsed_jar_urls
14931493
]
14941494
self._parse_urls(url_strings, jar_urls_list)
14951495

14961496
j_configuration = get_j_env_configuration(self._get_j_env())
1497-
parsed_jar_urls = Configuration.parse_jars_value(
1498-
j_configuration.getString(config_key, ""),
1499-
jvm
1497+
parsed_jar_urls = Configuration.parse_list_value(
1498+
j_configuration.getString(config_key, "")
15001499
)
15011500
self._parse_urls(parsed_jar_urls, jar_urls_list)
15021501

0 commit comments

Comments
 (0)