Skip to content

HTTP-122 Retry for source lookup table #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bin
/src/test/test.iml
/flink-http-connector.iml
/dependency-reduced-pom.xml
/.java-version
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased]

- Retries support for source table:
- Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
- Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`.

## [0.19.0] - 2025-03-20

- OIDC token request to not flow during explain
Expand Down
143 changes: 107 additions & 36 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dev/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</module>

<module name="LineLength">
<property name="max" value="100"/>
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>

Expand Down
34 changes: 16 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
<assertj.core.version>3.21.0</assertj.core.version>
Expand All @@ -87,6 +85,8 @@ under the License.
<jacoco.plugin.version>0.8.12</jacoco.plugin.version>
<maven.shade.plugin.version>3.1.1</maven.shade.plugin.version>
<mockito-inline.version>4.6.1</mockito-inline.version>
<resilence4j.version>1.7.1</resilence4j.version>
<slf4j.version>2.0.17</slf4j.version>
</properties>

<repositories>
Expand Down Expand Up @@ -119,25 +119,17 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have the logging change in a separate PR - it is easier to track the history then please

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed anyways?
Its because of resilence4j?

One of the "rule of thumbs" when we were starting this connector was to try not add any external libraries to the connector, that my or may not clash with any user code -> i.e that is why we use Java's 11 http client.

You need the resilence4j for retry functionality right?
Which in essence is -> schedule a task on Java's scheduled thread executor and make sure to do a good job around error/exception handling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedicated lib for retries might be an overkill now, but I think we can benefit in long term. The library provides Rate Limiter or Circuit Breaker. Both features might be worth adding. Or at least Rate Limiter.

Copy link
Author

@maciejmaciejko-gid maciejmaciejko-gid Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Why this change is needed anyways?
Its because of resilence4j?"

Yes, I had to change to compile project with resilence4j. Notice that Flink use the same API:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/

"You need the resilence4j for retry functionality right?"
I thought it's better to use mature library instead of reimplementing it. From the other side,as you said, it's additional dependency. Below part resilience4j dependencies based on mvn dependency:tree:

[INFO] +- io.github.resilience4j:resilience4j-retry:jar:1.7.1:compile
[INFO] |  +- io.vavr:vavr:jar:0.10.2:compile
[INFO] |  |  \- io.vavr:vavr-match:jar:0.10.2:compile
[INFO] |  \- io.github.resilience4j:resilience4j-core:jar:1.7.1:compile

Do you think it's ok to add them? Another option is to shadow them.

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -167,6 +159,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilence4j.version}</version>
</dependency>

<!--TEST-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.getindata.connectors.http;

import java.net.http.HttpResponse;

import lombok.Getter;

@Getter
public class HttpStatusCodeValidationFailedException extends Exception {
private final HttpResponse<?> response;

public HttpStatusCodeValidationFailedException(String message, HttpResponse<?> response) {
super(message);
this.response = response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;

/**
* A client that is used to get enrichment data from external component.
Expand All @@ -15,4 +16,10 @@ public interface PollingClient<T> {
* @return an optional result of data lookup.
*/
Collection<T> pull(RowData lookupRow);

/**
* Initialize the client.
* @param ctx function context
*/
void open(FunctionContext ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.Serializable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.util.ConfigurationException;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;

Expand All @@ -11,5 +12,5 @@ public interface PollingClientFactory<OUT> extends Serializable {
PollingClient<OUT> createPollClient(
HttpLookupConfig options,
DeserializationSchema<OUT> schemaDecoder
);
) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public final class HttpConnectorConfigConstants {
* A property prefix for http connector.
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";

/**
* A property prefix for http connector header properties
*/
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";

public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
+ "source.lookup.header.";
public static final String LOOKUP_SOURCE_HEADER_PREFIX = SOURCE_LOOKUP_PREFIX + "header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
Expand All @@ -40,33 +40,24 @@ public final class HttpConnectorConfigConstants {
* the special treatment of the header for Basic Authentication, thus preserving the passed
* raw value. Defaults to false.
*/
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = SOURCE_LOOKUP_PREFIX + "use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.request-callback";
SOURCE_LOOKUP_PREFIX + "request-callback";

public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "sink.request-callback";

public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.query-creator";
SOURCE_LOOKUP_PREFIX + "query-creator";

// -------------- HTTPS security settings --------------
public static final String ALLOW_SELF_SIGNED =
Expand All @@ -92,16 +83,19 @@ public final class HttpConnectorConfigConstants {
// ------ HTTPS timeouts and thread pool settings ------

public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
SOURCE_LOOKUP_PREFIX + "request.timeout";

public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";

public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";

public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "request.thread-pool.size";

public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "response.thread-pool.size";

public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
Expand All @@ -117,4 +111,21 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.request.batch.size";

// ---------------------------------------------
public static final String SOURCE_RETRY_SUCCESS_CODES = SOURCE_LOOKUP_PREFIX + "success-codes";
public static final String SOURCE_RETRY_RETRY_CODES = SOURCE_LOOKUP_PREFIX + "retry-codes";
public static final String SOURCE_IGNORE_RESPONSE_CODES = SOURCE_LOOKUP_PREFIX + "ignored-response-codes";

public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";

private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "fixed-delay.";
public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";

private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "exponential-delay.";
public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "max-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MULTIPLIER =
SOURCE_RETRY_EXP_DELAY_PREFIX + "backoff-multiplier";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.getindata.connectors.http.internal.retry;

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.function.Supplier;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;

import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
import com.getindata.connectors.http.internal.status.HttpResponseChecker;

@Slf4j
public class HttpClientWithRetry {

private final HttpClient httpClient;
@Getter
private final HttpResponseChecker responseChecker;
private final Retry retry;

@Builder
HttpClientWithRetry(HttpClient httpClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what happens with OIDC, the short lived bearer token may need to be regenerated if the retries occur after the token has expired). Is this regeneration check done for the retries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request is created only once, but OIDC processor (responsible for setting bearer token in request) is called on every retry.

RetryConfig retryConfig,
HttpResponseChecker responseChecker) {
this.httpClient = httpClient;
this.responseChecker = responseChecker;
var adjustedRetryConfig = RetryConfig.from(retryConfig)
.retryExceptions(IOException.class)
.retryOnResult(this::isTemporalError)
.build();
this.retry = Retry.of("http-lookup-connector", adjustedRetryConfig);
}

public void registerMetrics(MetricGroup metrics){
var group = metrics.addGroup("http_lookup_connector");
group.gauge("successfulCallsWithRetryAttempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
group.gauge("successfulCallsWithoutRetryAttempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
}

public <T> HttpResponse<T> send(
Supplier<HttpRequest> requestSupplier,
HttpResponse.BodyHandler<T> responseBodyHandler
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
try {
var response = Retry.decorateCheckedSupplier(retry,
() -> httpClient.send(requestSupplier.get(), responseBodyHandler)).apply();
if (!responseChecker.isSuccessful(response)) {
throw new HttpStatusCodeValidationFailedException(
"Incorrect response code: " + response.statusCode(), response);
}
return response;
} catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why IOException and InterruptedException are special here?
Why you need then on the send signature?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are checked exceptions from HttpClient.send method. I don't want to repack them.

throw e; //re-throw without wrapping
} catch (Throwable t) {
throw new RuntimeException("Unexpected exception", t);
}
}

private boolean isTemporalError(Object response) {
return responseChecker.isTemporalError((HttpResponse<?>) response);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.getindata.connectors.http.internal.retry;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.RetryConfig;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import static io.github.resilience4j.core.IntervalFunction.ofExponentialBackoff;

import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_STRATEGY;

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class RetryConfigProvider {

private final ReadableConfig config;

public static RetryConfig create(ReadableConfig config) {
return new RetryConfigProvider(config).create();
}

private RetryConfig create() {
return createBuilder()
.maxAttempts(config.get(LookupOptions.MAX_RETRIES) + 1)
.build();
}

private RetryConfig.Builder<?> createBuilder() {
var retryStrategy = getRetryStrategy();
if (retryStrategy == RetryStrategyType.FIXED_DELAY) {
return configureFixedDelay();
} else if (retryStrategy == RetryStrategyType.EXPONENTIAL_DELAY) {
return configureExponentialDelay();
}
throw new IllegalArgumentException("Unsupported retry strategy: " + retryStrategy);
}

private RetryStrategyType getRetryStrategy() {
return RetryStrategyType.fromCode(config.get(SOURCE_LOOKUP_RETRY_STRATEGY));
}

private RetryConfig.Builder<?> configureFixedDelay() {
return RetryConfig.custom()
.intervalFunction(IntervalFunction.of(config.get(SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY)));
}

private RetryConfig.Builder<?> configureExponentialDelay() {
var initialDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF);
var maxDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF);
var multiplier = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER);
return RetryConfig.custom()
.intervalFunction(ofExponentialBackoff(initialDelay, multiplier, maxDelay));
}
}
Loading
Loading