Skip to content

HTTP-122 Retry for source lookup table #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 53 additions & 38 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";

private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_LOOKUP_PREFIX + "fixed-delay.";
private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "fixed-delay.";
public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";

private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_LOOKUP_PREFIX + "exponential-delay.";
private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "exponential-delay.";
public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private RetryConfig.Builder<?> createBuilder() throws ConfigurationException {
}

private RetryStrategyType getRetryStrategy() {
return config.get(SOURCE_LOOKUP_RETRY_STRATEGY);
return RetryStrategyType.fromCode(config.get(SOURCE_LOOKUP_RETRY_STRATEGY));
}

private RetryConfig.Builder<?> configureFixedDelay() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
package com.getindata.connectors.http.internal.retry;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum RetryStrategyType {
FIXED_DELAY,
EXPONENTIAL_DELAY,
FIXED_DELAY("fixed-delay"),
EXPONENTIAL_DELAY("exponential-delay"),
;

private final String code;

public static RetryStrategyType fromCode(String code) {
if (code == null) {
throw new NullPointerException("Code is null");
}
for (var strategy : RetryStrategyType.values()) {
if (strategy.getCode().equalsIgnoreCase(code)) {
return strategy;
}
}
throw new IllegalArgumentException("No enum constant for " + code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@UtilityClass
public class HttpCodesParser {

private final Pattern CODE_GROUP_EXPRESSION = Pattern.compile("[1-5]XX");
private final Pattern CODE_GROUP_EXPRESSION = Pattern.compile("[1-5][xX]{2}");
private final String DELIMITER = Pattern.quote(",");
private final int HTTP_CODE_MIN = 100;
private final int HTTP_CODE_MAX = 599;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ public class HttpLookupConnectorOptions {
.noDefaultValue()
.withDescription("Http client connection timeout.");

public static final ConfigOption<RetryStrategyType> SOURCE_LOOKUP_RETRY_STRATEGY =
public static final ConfigOption<String> SOURCE_LOOKUP_RETRY_STRATEGY =
ConfigOptions.key(SOURCE_RETRY_STRATEGY_TYPE)
.enumType(RetryStrategyType.class)
.defaultValue(RetryStrategyType.FIXED_DELAY)
.withDescription("Auto retry strategy type: fixed_delay (default) or exponential_delay.");
.stringType()
.defaultValue(RetryStrategyType.FIXED_DELAY.getCode())
.withDescription("Auto retry strategy type: fixed-delay (default) or exponential-delay.");

public static final ConfigOption<String> SOURCE_LOOKUP_HTTP_SUCCESS_CODES =
ConfigOptions.key(SOURCE_RETRY_SUCCESS_CODES)
Expand All @@ -99,7 +99,7 @@ public class HttpLookupConnectorOptions {
ConfigOptions.key(SOURCE_RETRY_RETRY_CODES)
.stringType()
.defaultValue("500,503,504")
.withDescription("Comma separated http codes considered as temporal errors. " +
.withDescription("Comma separated http codes considered as transient errors. " +
"Use [1-5]XX for groups and '!' character for excluding.");

public static final ConfigOption<Duration> SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.getindata.connectors.http.internal.retry;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.Test;

import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class RetryConfigProviderTest {

@Test
void verifyFixedDelayRetryConfig() throws ConfigurationException {
var config = new Configuration();
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "fixed-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay", "10s");
config.setInteger("lookup.max-retries", 12);

var retryConfig = RetryConfigProvider.create(config);

assertEquals(12, retryConfig.getMaxAttempts());
IntStream.range(1, 12).forEach(attempt ->
assertEquals(10000, retryConfig.getIntervalFunction().apply(attempt))
);
}

@Test
void verifyExponentialDelayConfig() throws ConfigurationException {
var config = new Configuration();
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "exponential-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff", "15ms");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff", "120ms");
config.setInteger("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", 2);
config.setInteger("lookup.max-retries", 6);

var retryConfig = RetryConfigProvider.create(config);
var intervalFunction = retryConfig.getIntervalFunction();

assertEquals(6, retryConfig.getMaxAttempts());
assertEquals(15, intervalFunction.apply(1));
assertEquals(30, intervalFunction.apply(2));
assertEquals(60, intervalFunction.apply(3));
assertEquals(120, intervalFunction.apply(4));
assertEquals(120, intervalFunction.apply(5));
assertEquals(120, intervalFunction.apply(6));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.getindata.connectors.http.internal.retry;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class RetryStrategyTypeTest {

static Stream<Arguments> inputArguments() {
return Stream.of(
Arguments.of("FIXED-DELAY", RetryStrategyType.FIXED_DELAY),
Arguments.of("fixed-delay", RetryStrategyType.FIXED_DELAY),
Arguments.of("exponential-delay", RetryStrategyType.EXPONENTIAL_DELAY),
Arguments.of("EXPONENTIAL-DELAY", RetryStrategyType.EXPONENTIAL_DELAY)
);
}

@ParameterizedTest
@MethodSource("inputArguments")
void parseFromCodes(String code, RetryStrategyType expectedType) {
var result = RetryStrategyType.fromCode(code);

assertEquals(expectedType, result);
}

@ParameterizedTest
@ValueSource(strings = {
"fixed_delay",
"FIXED_DELAY",
"ABC",
"FIXED-DELA",
"exponential_delay"
})
void failWhenCodeIsIllegal(String code) {
assertThrows(IllegalArgumentException.class, () -> RetryStrategyType.fromCode(code));
}

@Test
void failWhenCodeIsNull() {
assertThrows(NullPointerException.class, () -> RetryStrategyType.fromCode(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class HttpCodesParserTest {
@ParameterizedTest
@ValueSource(strings = {
"6XX",
"4xx",
"1XXX",
"600",
"99",
Expand Down Expand Up @@ -54,6 +53,9 @@ private static Stream<InputArgs> inputArgsStream() {
.build(),
InputArgs.builder().codeExpression("!404, 4XX")
.expectedCodes(range(400, 500, 404))
.build(),
InputArgs.builder().codeExpression("2xX,!401,3Xx,4xx")
.expectedCodes(range(200, 500, 401))
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ void testLookupWithRetry() throws Exception {
+ "'url' = 'http://localhost:9090/client',"
+ "'lookup.max-retries' = '3',"
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ "'gid.connector.http.source.lookup.retry-strategy.type' = 'fixed_delay',"
+ "'gid.connector.http.source.lookup.fixed-delay.delay' = '1ms',"
+ "'gid.connector.http.source.lookup.retry-strategy.type' = 'fixed-delay',"
+ "'gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay' = '1ms',"
+ "'gid.connector.http.source.lookup.success-codes' = '2XX',"
+ "'gid.connector.http.source.lookup.retry-codes' = '501'"
+ ")";
Expand Down