Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-122 Add lookup retries #129

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public final class HttpConnectorConfigConstants {
+ "source.lookup.header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
+ "security.oidc.token.request";

public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP
+ "security.oidc.token.endpoint.url";
+ "security.oidc.token.endpoint.url";

public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP
+ "security.oidc.token.expiry.reduction";
+ "security.oidc.token.expiry.reduction";
/**
* Whether to use the raw value of the Authorization header. If set, it prevents
* the special treatment of the header for Basic Authentication, thus preserving the passed
Expand All @@ -49,11 +49,25 @@ public final class HttpConnectorConfigConstants {

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";

@Deprecated
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";

@Deprecated
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code";

public static final String HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.non-retryable.code.exclude";

public static final String HTTP_ERROR_NON_RETRYABLE_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.non-retryable.code";

public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.retryable.code.exclude";

public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.retryable.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.getindata.connectors.http.internal.config;

import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import static org.apache.flink.configuration.description.TextElement.text;

public enum RetryStrategyType implements DescribedEnum {

NONE("none", text("None")),
FIXED_DELAY("fixed-delay", text("Fixed delay strategy")),
EXPONENTIAL_DELAY("exponential-delay", text("Exponential delay strategy"));

private final String value;
private final InlineElement inlineElement;

RetryStrategyType(String value, InlineElement inlineElement) {
this.value = value;
this.inlineElement = inlineElement;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return inlineElement;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpResponseStatus;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST;

/**
* An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}. This
Expand All @@ -41,10 +44,10 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
private final RequestSubmitter requestSubmitter;

public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory) {
Properties properties,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory) {

this.httpPostRequestCallback = httpPostRequestCallback;
this.headerMap = HttpHeaderUtils.prepareHeaderMap(
Expand All @@ -58,8 +61,12 @@ public JavaNetSinkHttpClient(
ComposeHttpStatusCodeCheckerConfig checkerConfig =
ComposeHttpStatusCodeCheckerConfig.builder()
.properties(properties)
.whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST)
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST)
.whiteListPrefix(HTTP_ERROR_SINK_CODE_WHITE_LIST)
.errorCodePrefix(HTTP_ERROR_SINK_CODES_LIST)
.nonRetryableErrorWhiteListPrefix("") // TODO: sink not refactored yet
.nonRetryableErrorCodePrefix("")
.retryableErrorWhiteListPrefix("")
.retryableErrorCodePrefix("")
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
Expand All @@ -80,8 +87,8 @@ public CompletableFuture<SinkHttpClientResponse> putRequests(
}

private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
List<HttpSinkRequestEntry> requestEntries,
String endpointUrl) {
List<HttpSinkRequestEntry> requestEntries,
String endpointUrl) {

var responseFutures = requestSubmitter.submit(endpointUrl, requestEntries);
var allFutures = CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0]));
Expand All @@ -103,11 +110,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);

// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
failedResponses.add(sinkRequestEntry);
} else {
if (optResponse.isPresent() &&
statusCodeChecker.checkStatus(optResponse.get().statusCode())
.equals(HttpResponseStatus.SUCCESS)) {
successfulResponses.add(sinkRequestEntry);
} else {
failedResponses.add(sinkRequestEntry);
}
}

Expand Down
Loading
Loading