@@ -36,11 +36,11 @@ public class HttpLookupTableSourceFactory implements DynamicTableSourceFactory {
36
36
37
37
private static DataTypes .Field columnToField (Column column ) {
38
38
return FIELD (
39
- column .getName (),
40
- // only a column in a schema should have a time attribute,
41
- // a field should not propagate the attribute because it might be used in a
42
- // completely different context
43
- removeTimeAttribute (column .getDataType ()));
39
+ column .getName (),
40
+ // only a column in a schema should have a time attribute,
41
+ // a field should not propagate the attribute because it might be used in a
42
+ // completely different context
43
+ removeTimeAttribute (column .getDataType ()));
44
44
}
45
45
46
46
public static DataType row (List <Field > fields ) {
@@ -50,36 +50,36 @@ public static DataType row(List<Field> fields) {
50
50
@ Override
51
51
public DynamicTableSource createDynamicTableSource (Context dynamicTableContext ) {
52
52
FactoryUtil .TableFactoryHelper helper =
53
- FactoryUtil .createTableFactoryHelper (this , dynamicTableContext );
53
+ FactoryUtil .createTableFactoryHelper (this , dynamicTableContext );
54
54
55
55
ReadableConfig readable = helper .getOptions ();
56
56
helper .validateExcept (
57
- // properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
58
- "table." ,
59
- HttpConnectorConfigConstants .GID_CONNECTOR_HTTP ,
60
- LOOKUP_REQUEST_FORMAT .key ()
57
+ // properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
58
+ "table." ,
59
+ HttpConnectorConfigConstants .GID_CONNECTOR_HTTP ,
60
+ LOOKUP_REQUEST_FORMAT .key ()
61
61
);
62
62
validateHttpLookupSourceOptions (readable );
63
63
64
64
DecodingFormat <DeserializationSchema <RowData >> decodingFormat =
65
- helper .discoverDecodingFormat (
66
- DeserializationFormatFactory .class ,
67
- FactoryUtil .FORMAT
68
- );
65
+ helper .discoverDecodingFormat (
66
+ DeserializationFormatFactory .class ,
67
+ FactoryUtil .FORMAT
68
+ );
69
69
70
70
HttpLookupConfig lookupConfig = getHttpLookupOptions (dynamicTableContext , readable );
71
71
72
72
ResolvedSchema resolvedSchema = dynamicTableContext .getCatalogTable ().getResolvedSchema ();
73
73
74
74
DataType physicalRowDataType =
75
- toRowDataType (resolvedSchema .getColumns (), Column ::isPhysical );
75
+ toRowDataType (resolvedSchema .getColumns (), Column ::isPhysical );
76
76
77
77
return new HttpLookupTableSource (
78
- physicalRowDataType ,
79
- lookupConfig ,
80
- decodingFormat ,
81
- dynamicTableContext ,
82
- getLookupCache (readable )
78
+ physicalRowDataType ,
79
+ lookupConfig ,
80
+ decodingFormat ,
81
+ dynamicTableContext ,
82
+ getLookupCache (readable )
83
83
);
84
84
}
85
85
@@ -89,7 +89,7 @@ protected void validateHttpLookupSourceOptions(ReadableConfig tableOptions)
89
89
tableOptions .getOptional (SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL ).ifPresent (url -> {
90
90
if (tableOptions .getOptional (SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST ).isEmpty ()) {
91
91
throw new IllegalArgumentException ("Config option " +
92
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST .key () + " is required, if " +
92
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST .key () + " is required, if " +
93
93
SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL .key () + " is configured." );
94
94
}
95
95
});
@@ -108,57 +108,57 @@ public Set<ConfigOption<?>> requiredOptions() {
108
108
@ Override
109
109
public Set <ConfigOption <?>> optionalOptions () {
110
110
return Set .of (
111
- URL_ARGS ,
112
- ASYNC_POLLING ,
113
- LOOKUP_METHOD ,
114
- REQUEST_CALLBACK_IDENTIFIER ,
115
-
116
- LookupOptions .CACHE_TYPE ,
117
- LookupOptions .PARTIAL_CACHE_EXPIRE_AFTER_ACCESS ,
118
- LookupOptions .PARTIAL_CACHE_EXPIRE_AFTER_WRITE ,
119
- LookupOptions .PARTIAL_CACHE_MAX_ROWS ,
120
- LookupOptions .PARTIAL_CACHE_CACHE_MISSING_KEY ,
121
-
122
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION ,
123
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST ,
124
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL ,
125
-
126
- LookupOptions .MAX_RETRIES ,
127
- SOURCE_LOOKUP_RETRY_STRATEGY ,
128
- SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY ,
129
- SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF ,
130
- SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER ,
131
- SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF ,
132
-
133
- SOURCE_LOOKUP_HTTP_SUCCESS_CODES ,
134
- SOURCE_LOOKUP_HTTP_RETRY_CODES ,
135
- SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES ,
136
-
137
- SOURCE_LOOKUP_CONNECTION_TIMEOUT // TODO: add request timeout from properties
111
+ URL_ARGS ,
112
+ ASYNC_POLLING ,
113
+ LOOKUP_METHOD ,
114
+ REQUEST_CALLBACK_IDENTIFIER ,
115
+
116
+ LookupOptions .CACHE_TYPE ,
117
+ LookupOptions .PARTIAL_CACHE_EXPIRE_AFTER_ACCESS ,
118
+ LookupOptions .PARTIAL_CACHE_EXPIRE_AFTER_WRITE ,
119
+ LookupOptions .PARTIAL_CACHE_MAX_ROWS ,
120
+ LookupOptions .PARTIAL_CACHE_CACHE_MISSING_KEY ,
121
+
122
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION ,
123
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST ,
124
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL ,
125
+
126
+ LookupOptions .MAX_RETRIES ,
127
+ SOURCE_LOOKUP_RETRY_STRATEGY ,
128
+ SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY ,
129
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF ,
130
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER ,
131
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF ,
132
+
133
+ SOURCE_LOOKUP_HTTP_SUCCESS_CODES ,
134
+ SOURCE_LOOKUP_HTTP_RETRY_CODES ,
135
+ SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES ,
136
+
137
+ SOURCE_LOOKUP_CONNECTION_TIMEOUT // TODO: add request timeout from properties
138
138
);
139
139
}
140
140
141
141
private HttpLookupConfig getHttpLookupOptions (Context context , ReadableConfig readableConfig ) {
142
142
143
143
Properties httpConnectorProperties =
144
- ConfigUtils .getHttpConnectorProperties (context .getCatalogTable ().getOptions ());
144
+ ConfigUtils .getHttpConnectorProperties (context .getCatalogTable ().getOptions ());
145
145
146
146
final HttpPostRequestCallbackFactory <HttpLookupSourceRequestEntry >
147
147
postRequestCallbackFactory =
148
- FactoryUtil .discoverFactory (
148
+ FactoryUtil .discoverFactory (
149
149
context .getClassLoader (),
150
150
HttpPostRequestCallbackFactory .class ,
151
151
readableConfig .get (REQUEST_CALLBACK_IDENTIFIER )
152
- );
152
+ );
153
153
154
154
return HttpLookupConfig .builder ()
155
- .lookupMethod (readableConfig .get (LOOKUP_METHOD ))
156
- .url (readableConfig .get (URL ))
157
- .useAsync (readableConfig .get (ASYNC_POLLING ))
158
- .properties (httpConnectorProperties )
159
- .readableConfig (readableConfig )
160
- .httpPostRequestCallback (postRequestCallbackFactory .createHttpPostRequestCallback ())
161
- .build ();
155
+ .lookupMethod (readableConfig .get (LOOKUP_METHOD ))
156
+ .url (readableConfig .get (URL ))
157
+ .useAsync (readableConfig .get (ASYNC_POLLING ))
158
+ .properties (httpConnectorProperties )
159
+ .readableConfig (readableConfig )
160
+ .httpPostRequestCallback (postRequestCallbackFactory .createHttpPostRequestCallback ())
161
+ .build ();
162
162
}
163
163
164
164
@ Nullable
@@ -177,13 +177,13 @@ private LookupCache getLookupCache(ReadableConfig tableOptions) {
177
177
// Backport from Flink 1.15-Master
178
178
private DataType toRowDataType (List <Column > columns , Predicate <Column > columnPredicate ) {
179
179
return columns .stream ()
180
- .filter (columnPredicate )
181
- .map (HttpLookupTableSourceFactory ::columnToField )
182
- .collect (
183
- Collectors .collectingAndThen (Collectors .toList (),
184
- HttpLookupTableSourceFactory ::row ))
185
- // the row should never be null
186
- .notNull ();
180
+ .filter (columnPredicate )
181
+ .map (HttpLookupTableSourceFactory ::columnToField )
182
+ .collect (
183
+ Collectors .collectingAndThen (Collectors .toList (),
184
+ HttpLookupTableSourceFactory ::row ))
185
+ // the row should never be null
186
+ .notNull ();
187
187
}
188
188
189
189
// Backport End
0 commit comments