Skip to content

Commit 80704e6

Browse files
authored
Issue54 Fix class loader and Prefixed Config option issues (#68)
* Issue54 early prototype not working Signed-off-by: David Radley <[email protected]> * fixed the class loader part Signed-off-by: David Radley <[email protected]> * Change Prefixed Config Option to not extend ConfigOption Signed-off-by: David Radley <[email protected]> * Update Change log Signed-off-by: David Radley <[email protected]> * update doc link to 1.16 Signed-off-by: David Radley <[email protected]> * update more links to 1.16 Signed-off-by: David Radley <[email protected]> * update more links to 1.16 Signed-off-by: David Radley <[email protected]> * Use reflection to update the key in the ConfigOption Signed-off-by: David Radley <[email protected]> * Re-implement reflection after review feedback Signed-off-by: David Radley <[email protected]> * Remove unused classloader field Signed-off-by: David Radley <[email protected]> * fix compile error and address review feedback Signed-off-by: David Radley <[email protected]> --------- Signed-off-by: David Radley <[email protected]>
1 parent e7b1b0a commit 80704e6

12 files changed

+177
-117
lines changed

CHANGELOG.md

+13
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@
44

55
## [0.10.0] - 2023-07-05
66

7+
8+
### Fixed
9+
10+
Fixed an issue where SQL Client did not work with the connector at Flink 1.16.
11+
12+
This required a change to use a different classloader in the lookup join processing.
13+
As well as the classloader change, a change to the PrefixedConfigOption implementation was
14+
required, because it was implemented as an extension to ConfigOption; which produced
15+
access errors when trying to access the parent class protected methods (the parent class was loaded
16+
using a different classloader). The new implementation is not an extension; instead it holds an
17+
instance of the ConfigOption as a private variable and uses reflection to instantiate a cloned
18+
ConfigOption object with the prefixed key.
19+
720
### Added
821

922
- Add support for batch request submission in HTTP sink. The mode can be changed by setting

README.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ In case of updating http-connector please see [Breaking changes](#breaking-chang
1919
## Prerequisites
2020
* Java 11
2121
* Maven 3
22-
* Flink 1.15+
22+
* Flink 1.16+
2323

2424
## Runtime dependencies
2525
This connector has few Flink's runtime dependencies, that are expected to be provided.
@@ -439,16 +439,15 @@ Project build command: `mvn package`. </br>
439439
Detailed test report can be found under `target/site/jacoco/index.xml`.
440440

441441
## Demo application
442-
**Note**: This demo works only for Flink-1.15.x even though connector can be used with Flink 1.16.
443-
This problem is addressed by https://github.com/getindata/flink-http-connector/issues/54 and will be fix in next relesees.
442+
**Note**: This demo works only for Flink-1.15x.
444443

445444
You can test this connector using simple mock http server provided with this repository and Flink SQL-client.
446445
The mock server can be started from IDE (currently only this way) by running `HttpStubApp::main` method.
447446
It will start HTTP server listening on `http://localhost:8080/client`
448447

449448
Steps to follow:
450449
- Run Mock HTTP server from `HttpStubApp::main` method.
451-
- Start your Flink cluster, for example as described under https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/try-flink/local_installation/
450+
- Start your Flink cluster, for example as described under https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/try-flink/local_installation/
452451
- Start Flink SQL Client [6] by calling: `./bin/sql-client.sh -j flink-http-connector-1.0-SNAPSHOT.jar`
453452
- Execute SQL statements:
454453
Create Data Stream source Table:

src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Serializable;
44

55
import org.apache.flink.configuration.ReadableConfig;
6+
import org.apache.flink.table.factories.DynamicTableFactory;
67
import org.apache.flink.table.factories.Factory;
78

89
import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource;
@@ -40,5 +41,6 @@ public interface LookupQueryCreatorFactory extends Factory, Serializable {
4041
*/
4142
LookupQueryCreator createLookupQueryCreator(
4243
ReadableConfig readableConfig,
43-
LookupRow lookupRow);
44+
LookupRow lookupRow,
45+
DynamicTableFactory.Context dynamicTableFactoryContext);
4446
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import lombok.extern.slf4j.Slf4j;
77
import org.apache.flink.api.common.serialization.DeserializationSchema;
8+
import org.apache.flink.configuration.ReadableConfig;
89
import org.apache.flink.table.api.DataTypes;
910
import org.apache.flink.table.api.DataTypes.Field;
1011
import org.apache.flink.table.connector.format.DecodingFormat;
@@ -15,6 +16,7 @@
1516
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
1617
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
1718
import org.apache.flink.table.data.RowData;
19+
import org.apache.flink.table.factories.DynamicTableFactory;
1820
import org.apache.flink.table.factories.FactoryUtil;
1921
import org.apache.flink.table.types.DataType;
2022
import org.apache.flink.table.types.logical.LogicalType;
@@ -40,29 +42,33 @@ public class HttpLookupTableSource
4042

4143
private final HttpLookupConfig lookupConfig;
4244

45+
private final DynamicTableFactory.Context dynamicTableFactoryContext;
46+
4347
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
4448

4549
public HttpLookupTableSource(
4650
DataType physicalRowDataType,
4751
HttpLookupConfig lookupConfig,
48-
DecodingFormat<DeserializationSchema<RowData>> decodingFormat) {
52+
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
53+
DynamicTableFactory.Context dynamicTablecontext) {
4954

5055
this.physicalRowDataType = physicalRowDataType;
5156
this.lookupConfig = lookupConfig;
5257
this.decodingFormat = decodingFormat;
58+
this.dynamicTableFactoryContext = dynamicTablecontext;
5359
}
5460

5561
@Override
56-
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
62+
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
5763

58-
LookupRow lookupRow = extractLookupRow(context.getKeys());
64+
LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());
5965

6066
DeserializationSchema<RowData> responseSchemaDecoder =
61-
decodingFormat.createRuntimeDecoder(context, physicalRowDataType);
67+
decodingFormat.createRuntimeDecoder(lookupContext, physicalRowDataType);
6268

6369
LookupQueryCreatorFactory lookupQueryCreatorFactory =
6470
FactoryUtil.discoverFactory(
65-
Thread.currentThread().getContextClassLoader(),
71+
this.dynamicTableFactoryContext.getClassLoader(),
6672
LookupQueryCreatorFactory.class,
6773
lookupConfig.getReadableConfig().getOptional(LOOKUP_QUERY_CREATOR_IDENTIFIER)
6874
.orElse(
@@ -71,11 +77,12 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
7177
GenericJsonQueryCreatorFactory.IDENTIFIER)
7278
)
7379
);
74-
80+
ReadableConfig readableConfig = lookupConfig.getReadableConfig();
7581
LookupQueryCreator lookupQueryCreator =
7682
lookupQueryCreatorFactory.createLookupQueryCreator(
77-
lookupConfig.getReadableConfig(),
78-
lookupRow
83+
readableConfig,
84+
lookupRow,
85+
dynamicTableFactoryContext
7986
);
8087

8188
PollingClientFactory<RowData> pollingClientFactory =
@@ -104,7 +111,8 @@ public DynamicTableSource copy() {
104111
return new HttpLookupTableSource(
105112
physicalRowDataType,
106113
lookupConfig,
107-
decodingFormat
114+
decodingFormat,
115+
dynamicTableFactoryContext
108116
);
109117
}
110118

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ public static DataType row(List<Field> fields) {
4343
}
4444

4545
@Override
46-
public DynamicTableSource createDynamicTableSource(Context context) {
46+
public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) {
4747
FactoryUtil.TableFactoryHelper helper =
48-
FactoryUtil.createTableFactoryHelper(this, context);
48+
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);
4949

5050
ReadableConfig readableConfig = helper.getOptions();
5151
helper.validateExcept(
@@ -61,17 +61,18 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6161
FactoryUtil.FORMAT
6262
);
6363

64-
HttpLookupConfig lookupConfig = getHttpLookupOptions(context, readableConfig);
64+
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
6565

66-
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
66+
ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();
6767

6868
DataType physicalRowDataType =
6969
toRowDataType(resolvedSchema.getColumns(), Column::isPhysical);
7070

7171
return new HttpLookupTableSource(
7272
physicalRowDataType,
7373
lookupConfig,
74-
decodingFormat
74+
decodingFormat,
75+
dynamicTableContext
7576
);
7677
}
7778

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ElasticSearchLiteQueryCreatorFactory.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
import org.apache.flink.configuration.ConfigOption;
66
import org.apache.flink.configuration.ReadableConfig;
7+
import org.apache.flink.table.factories.DynamicTableFactory;
78

89
import com.getindata.connectors.http.LookupQueryCreator;
910
import com.getindata.connectors.http.LookupQueryCreatorFactory;
1011
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
1112

13+
1214
/**
1315
* Factory for creating {@link ElasticSearchLiteQueryCreator}.
1416
*/
@@ -19,7 +21,9 @@ public class ElasticSearchLiteQueryCreatorFactory implements LookupQueryCreatorF
1921
@Override
2022
public LookupQueryCreator createLookupQueryCreator(
2123
ReadableConfig readableConfig,
22-
LookupRow lookupRow) {
24+
LookupRow lookupRow,
25+
DynamicTableFactory.Context dynamicTableFactoryContext
26+
) {
2327
return new ElasticSearchLiteQueryCreator(lookupRow);
2428
}
2529

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreatorFactory.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
import org.apache.flink.configuration.ConfigOption;
66
import org.apache.flink.configuration.ReadableConfig;
7+
import org.apache.flink.table.factories.DynamicTableFactory;
78

89
import com.getindata.connectors.http.LookupQueryCreator;
910
import com.getindata.connectors.http.LookupQueryCreatorFactory;
1011
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
1112

13+
1214
/**
1315
* Factory for creating {@link GenericGetQueryCreator}.
1416
*/
@@ -19,7 +21,8 @@ public class GenericGetQueryCreatorFactory implements LookupQueryCreatorFactory
1921
@Override
2022
public LookupQueryCreator createLookupQueryCreator(
2123
ReadableConfig readableConfig,
22-
LookupRow lookupRow) {
24+
LookupRow lookupRow,
25+
DynamicTableFactory.Context dynamicTableFactoryContext) {
2326
return new GenericGetQueryCreator(lookupRow);
2427
}
2528

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactory.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.flink.configuration.ReadableConfig;
99
import org.apache.flink.table.connector.format.EncodingFormat;
1010
import org.apache.flink.table.data.RowData;
11+
import org.apache.flink.table.factories.DynamicTableFactory;
1112
import org.apache.flink.table.factories.FactoryUtil;
1213
import org.apache.flink.table.factories.SerializationFormatFactory;
1314

@@ -26,22 +27,24 @@ public class GenericJsonQueryCreatorFactory implements LookupQueryCreatorFactory
2627
@Override
2728
public LookupQueryCreator createLookupQueryCreator(
2829
ReadableConfig readableConfig,
29-
LookupRow lookupRow) {
30+
LookupRow lookupRow,
31+
DynamicTableFactory.Context dynamicTableFactoryContext) {
3032

3133
String formatIdentifier = readableConfig.get(LOOKUP_REQUEST_FORMAT);
3234
SerializationFormatFactory jsonFormatFactory =
3335
FactoryUtil.discoverFactory(
34-
Thread.currentThread().getContextClassLoader(),
36+
dynamicTableFactoryContext.getClassLoader(),
3537
SerializationFormatFactory.class,
3638
formatIdentifier
3739
);
38-
39-
EncodingFormat<SerializationSchema<RowData>>
40-
encoder = jsonFormatFactory.createEncodingFormat(
41-
null,
40+
QueryFormatAwareConfiguration queryFormatAwareConfiguration =
4241
new QueryFormatAwareConfiguration(
4342
LOOKUP_REQUEST_FORMAT.key() + "." + formatIdentifier,
44-
(Configuration) readableConfig)
43+
(Configuration) readableConfig);
44+
EncodingFormat<SerializationSchema<RowData>>
45+
encoder = jsonFormatFactory.createEncodingFormat(
46+
dynamicTableFactoryContext,
47+
queryFormatAwareConfiguration
4548
);
4649

4750
SerializationSchema<RowData> serializationSchema =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.getindata.connectors.http.internal.table.lookup.querycreators;
2+
3+
import java.lang.reflect.Constructor;
4+
import java.lang.reflect.Field;
5+
import java.lang.reflect.InvocationTargetException;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
import org.apache.flink.configuration.ConfigOption;
10+
import org.apache.flink.configuration.FallbackKey;
11+
12+
/**
13+
* This is a ConfigOption that has an associated config option and prefix.
14+
*
15+
* Note that this Class used to extend ConfigOption,
16+
* but at Flink 1.16, there was a new way of doing class loaders
17+
* for custom content, so we could no longer extend ConfigOption.
18+
*/
19+
public class PrefixedConfigOption<T> {
20+
/**
21+
* configOption to decorate
22+
*/
23+
private ConfigOption configOption;
24+
25+
public ConfigOption getConfigOption() {
26+
return configOption;
27+
}
28+
29+
/**
30+
* This constructor creates a new clone of the supplied option 'other' with
31+
* the prefix prefixing the key. We create a new object, because we do
32+
* not want to mutate a Flink object that we did not create.
33+
*
34+
* @param keyPrefix prefix that will be added to decorate the {@link ConfigOption} key.
35+
* @param other original {@link ConfigOption} to clone and decorate.
36+
*/
37+
public PrefixedConfigOption(String keyPrefix, ConfigOption<T> other) {
38+
String prefixedKey = keyPrefix + other.key();
39+
Class clazz;
40+
boolean isList;
41+
42+
try {
43+
// get clazz
44+
Field field = other.getClass().getDeclaredField("clazz");
45+
field.setAccessible(true);
46+
clazz = (Class) field.get(other);
47+
48+
// get isList
49+
field = other.getClass().getDeclaredField("isList");
50+
field.setAccessible(true);
51+
isList = (Boolean) field.get(other);
52+
53+
/*
54+
* Create a new ConfigOption based on other, but with a prefixed key.
55+
* At 1.16 we cannot access the protected fields / constructor in the supplied
56+
* configOption as this object is loaded using a different classloader.
57+
* Without changing Flink to make the constructor, methods and fields public, we need
58+
* to use reflection to access and create the new prefixed ConfigOption. It is not
59+
* great practise to use reflection, but getting round this classloader issue
60+
* necessitates it's use.
61+
*/
62+
Constructor constructor = other.getClass().getDeclaredConstructors()[0];
63+
constructor.setAccessible(true);
64+
configOption = (ConfigOption) constructor.newInstance(prefixedKey,
65+
clazz,
66+
other.description(),
67+
other.defaultValue(),
68+
isList,
69+
getFallbackKeys(other));
70+
} catch (InstantiationException |
71+
IllegalAccessException |
72+
InvocationTargetException |
73+
NoSuchFieldException e) {
74+
throw new RuntimeException(e);
75+
}
76+
}
77+
78+
private static FallbackKey[] getFallbackKeys(ConfigOption<?> other) {
79+
List<FallbackKey> fallbackKeys = new ArrayList<>();
80+
for (FallbackKey fallbackKey : other.fallbackKeys()) {
81+
fallbackKeys.add(fallbackKey);
82+
}
83+
return fallbackKeys.toArray(new FallbackKey[0]);
84+
}
85+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryFormatAwareConfiguration.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import org.apache.flink.configuration.ConfigOption;
66
import org.apache.flink.configuration.Configuration;
7-
import org.apache.flink.configuration.PrefixedConfigOption;
87
import org.apache.flink.table.factories.SerializationFormatFactory;
98

109
/**
@@ -36,8 +35,8 @@ class QueryFormatAwareConfiguration extends Configuration {
3635
*/
3736
@Override
3837
public <T> Optional<T> getOptional(ConfigOption<T> option) {
39-
PrefixedConfigOption<T> configOption = new PrefixedConfigOption<>(queryFormatName, option);
40-
return super.getOptional(configOption);
38+
PrefixedConfigOption<T> prefixedConfigOption =
39+
new PrefixedConfigOption<>(queryFormatName, option);
40+
return super.getOptional(prefixedConfigOption.getConfigOption());
4141
}
42-
4342
}

0 commit comments

Comments
 (0)