Skip to content

HTTP-122 Retry for source lookup table #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bin
/src/test/test.iml
/flink-http-connector.iml
/dependency-reduced-pom.xml
/.java-version
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## [Unreleased]

- OIDC token request to not flow during explain
- Added support for auto-retry for source table. Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normally one pr would be one line here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, set as one item with subitems

- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` are replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
- Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`.

## [0.18.0] - 2025-01-15

Expand Down
48 changes: 42 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,22 @@ The second one is set per individual HTTP requests by HTTP client. Its default v
Flink's current implementation of `AsyncTableFunction` does not allow specifying custom logic for handling Flink AsyncIO timeouts as it is for Java API.
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException which will cause job restart.

The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs.
To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`.
#### Retries
Lookup source handles auto-retries for two scenarios:
1. IOException occurs (e.g. timeout)
2. HTTP server returns response with code, which is marked as temporal error. These codes are defined in table configuration.
Retries are executed silently, without job restart. After reaching max retries attempts (per request) function will fail and restart job.

Notice that response codes are splitted into 3 groups:
- successful responses - response is returned immediately for further processing
- temporary errors - request will be re-sent
- error responses - unexpected response, which will fail the job. Any code which is not marked as successful or temporary error is marked as error.

#### Retry strategy
User can choose retry strategy type for source table:
- fixed-delay - http request will be re-sent after specified delay
- exponential-delay - request will be re-sent with exponential backoff strategy, limited to max-retries attempts.
Copy link
Contributor

@davidradl davidradl Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the config option is lookup.max-retries (I suggest using the exact config parameter name) - do we need a separate config for max-retries for sinks?

It would be worth defining exactly what we mean by exponential backoff strategy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, good idea. I added explanation



#### Lookup multiple results

Expand Down Expand Up @@ -391,19 +405,32 @@ is provided.


## HTTP status code handler
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
### Sink table
Http Sink allows defining list of HTTP status codes that should be treated as errors.
By default all 400s and 500s response codes will be interpreted as error code.

This behavior can be changed by using below properties in table definition (DDL) for Sink and Lookup Source or passing it via
`setProperty' method from Sink's builder. The property names are:
- `gid.connector.http.sink.error.code` and `gid.connector.http.source.lookup.error.code` used to defined HTTP status code value that should be treated as error for example 404.
`setProperty' method from Sink's builder. The property name are:
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is X the only mask character?

Copy link
Author

@maciejmaciejko-gid maciejmaciejko-gid Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of sink configuration, which wasn't changed. It allows only [1-5]XX or exact http code value. I reimplemented handling of http response code only for source table, which is connected with retry feature.

- `gid.connector.http.sink.error.code.exclude` and `gid.connector.http.source.lookup.error.code.exclude` used to exclude a HTTP code from error list.
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.

### Source table
Http source requires success codes defined in parameter: `gid.connector.http.source.lookup.success-codes`. That list should contains all http status codes
which are considered as success response. It may be 200 (ok) as well as 404 (not found). The first one is standard response and its content should be deserialized/parsed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: response -> responses

I am not sure what we mean by " It may be 200 (ok) as well as 404 (not found). The first one is standard response and its content should be deserialized/parsed." Is 200 and 404 the defaults or recommended settings?

Processing of 404 request's content may be skipped by adding it to parameter `gid.connector.http.source.lookup.ignored-response-codes`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does skipped mean here - fail the job?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The section was edited. Could you check if it's clear now?


Both parameters supports whitelisting and blacklisting (ops!). Sample configuration may look like:
`2XX,404,!203` - meaning all codes from group 2XX (200-299), with 404 and without 203 ('!' character). Group blacklisting e.g. !2XX is not supported.
Notice that ignored-response-codes has to be a subset of success-codes.

The same format is used in parameter `gid.connector.http.source.lookup.retry-codes`.


## TLS (more secure replacement for SSL) and mTLS support

Both Http Sink and Lookup Source connectors support HTTPS communication using TLS 1.2 and mTLS.
Expand Down Expand Up @@ -479,6 +506,15 @@ be requested if the current time is later than the cached token expiry time minu
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |
| gid.connector.http.source.lookup.connection.timeout | optional | Source table connection timeout. |
| gid.connector.http.source.lookup.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. |
| gid.connector.http.source.lookup.retry-codes | optional | Comma separated http codes considered as temporal errors. Use [1-5]XX for groups and '!' character for excluding. |
| gid.connector.http.source.lookup.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored response codes has to be a subset of `gid.connector.http.source.lookup.success-codes`. |
| gid.connector.http.source.lookup.retry-strategy.type | optional | Auto retry strategy type: fixed_delay (default) or exponential_delay. |
| gid.connector.http.source.lookup.fixed-delay.delay | optional | Fixed-delay interval between retries. Default 1 second. |
| gid.connector.http.source.lookup.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. |
| gid.connector.http.source.lookup.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. |
| gid.connector.http.source.lookup.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 |

### HTTP Sink

Expand Down
12 changes: 6 additions & 6 deletions dev/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</module>

<module name="LineLength">
<property name="max" value="100"/>
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>

Expand Down Expand Up @@ -163,11 +163,11 @@
import com.getindata.connectors.*
import com.getindata.connectors.internal.*
-->
<module name="ImportOrder">
<property name="separated" value="true"/>
<property name="ordered" value="true"/>
<property name="groups" value="java.,javax.,scala,*,com.getindata.connectors,com.getindata.connectors.internal"/>
</module>
<!-- <module name="ImportOrder">-->
<!-- <property name="separated" value="true"/>-->
<!-- <property name="ordered" value="true"/>-->
<!-- <property name="groups" value="java.,javax.,scala,*,com.getindata.connectors,com.getindata.connectors.internal"/>-->
<!-- </module>-->

<!--
As per https://checkstyle.sourceforge.io/config_imports.html, "There is no flexibility to
Expand Down
33 changes: 16 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ under the License.
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
<assertj.core.version>3.21.0</assertj.core.version>
Expand All @@ -87,6 +86,8 @@ under the License.
<jacoco.plugin.version>0.8.12</jacoco.plugin.version>
<maven.shade.plugin.version>3.1.1</maven.shade.plugin.version>
<mockito-inline.version>4.6.1</mockito-inline.version>
<resilence4j.version>1.7.1</resilence4j.version>
<slf4j.version>2.0.17</slf4j.version>
</properties>

<repositories>
Expand Down Expand Up @@ -119,25 +120,17 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have the logging change in a separate PR - it is easier to track the history then please

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed anyways?
Its because of resilence4j?

One of the "rule of thumbs" when we were starting this connector was to try not add any external libraries to the connector, that my or may not clash with any user code -> i.e that is why we use Java's 11 http client.

You need the resilence4j for retry functionality right?
Which in essence is -> schedule a task on Java's scheduled thread executor and make sure to do a good job around error/exception handling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedicated lib for retries might be an overkill now, but I think we can benefit in long term. The library provides Rate Limiter or Circuit Breaker. Both features might be worth adding. Or at least Rate Limiter.

Copy link
Author

@maciejmaciejko-gid maciejmaciejko-gid Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Why this change is needed anyways?
Its because of resilence4j?"

Yes, I had to change to compile project with resilence4j. Notice that Flink use the same API:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/

"You need the resilence4j for retry functionality right?"
I thought it's better to use mature library instead of reimplementing it. From the other side,as you said, it's additional dependency. Below part resilience4j dependencies based on mvn dependency:tree:

[INFO] +- io.github.resilience4j:resilience4j-retry:jar:1.7.1:compile
[INFO] |  +- io.vavr:vavr:jar:0.10.2:compile
[INFO] |  |  \- io.vavr:vavr-match:jar:0.10.2:compile
[INFO] |  \- io.github.resilience4j:resilience4j-core:jar:1.7.1:compile

Do you think it's ok to add them? Another option is to shadow them.

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -167,6 +160,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilence4j.version}</version>
</dependency>

<!--TEST-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.getindata.connectors.http;

import lombok.Getter;

import java.net.http.HttpResponse;

@Getter
public class HttpStatusCodeValidationFailedException extends Exception {
private final HttpResponse<?> response;

public HttpStatusCodeValidationFailedException(String message, HttpResponse<?> response) {
super(message);
this.response = response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
import org.apache.flink.util.ConfigurationException;

public interface PollingClientFactory<OUT> extends Serializable {

PollingClient<OUT> createPollClient(
HttpLookupConfig options,
DeserializationSchema<OUT> schemaDecoder
);
) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public final class HttpConnectorConfigConstants {
* A property prefix for http connector.
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";

/**
* A property prefix for http connector header properties
*/
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";

public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
+ "source.lookup.header.";
public static final String LOOKUP_SOURCE_HEADER_PREFIX = SOURCE_LOOKUP_PREFIX + "header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
Expand All @@ -40,33 +40,24 @@ public final class HttpConnectorConfigConstants {
* the special treatment of the header for Basic Authentication, thus preserving the passed
* raw value. Defaults to false.
*/
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = SOURCE_LOOKUP_PREFIX + "use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.request-callback";
SOURCE_LOOKUP_PREFIX + "request-callback";

public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "sink.request-callback";

public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.query-creator";
SOURCE_LOOKUP_PREFIX + "query-creator";

// -------------- HTTPS security settings --------------
public static final String ALLOW_SELF_SIGNED =
Expand All @@ -92,16 +83,19 @@ public final class HttpConnectorConfigConstants {
// ------ HTTPS timeouts and thread pool settings ------

public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
SOURCE_LOOKUP_PREFIX + "request.timeout";

public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";

public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";

public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "request.thread-pool.size";

public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "response.thread-pool.size";

public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
Expand All @@ -117,4 +111,21 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.request.batch.size";

// ---------------------------------------------
public static final String SOURCE_RETRY_SUCCESS_CODES = SOURCE_LOOKUP_PREFIX + "success-codes";
public static final String SOURCE_RETRY_RETRY_CODES = SOURCE_LOOKUP_PREFIX + "retry-codes";
public static final String SOURCE_IGNORE_RESPONSE_CODES = SOURCE_LOOKUP_PREFIX + "ignored-response-codes";

public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";

private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_LOOKUP_PREFIX + "fixed-delay.";
public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";

private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_LOOKUP_PREFIX + "exponential-delay.";
public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "max-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MULTIPLIER =
SOURCE_RETRY_EXP_DELAY_PREFIX + "backoff-multiplier";
}
Loading
Loading