Skip to content

Commit

Permalink
router: implement RetryHostPredicate (envoyproxy#4385)
Browse files Browse the repository at this point in the history
Wires up route configuration to allow specifying what hosts should be
reattempted during retry host selection.
Risk Level: Medium, some changes made to the router. Otherwise new optional feature
Testing: unit and integration test
Docs Changes: n/a
Release Notes: n/a

Part of envoyproxy#3958

Signed-off-by: Snow Pettersen <[email protected]>
  • Loading branch information
snowp authored and htuch committed Sep 14, 2018
1 parent c32aed9 commit 62eb123
Show file tree
Hide file tree
Showing 24 changed files with 314 additions and 14 deletions.
21 changes: 21 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ class RetryPolicy {
* @return uint32_t a local OR of RETRY_ON values above.
*/
virtual uint32_t retryOn() const PURE;

/**
* Initializes the RetryHostPredicates to be used with this retry attempt.
* @return list of RetryHostPredicates to use
*/
virtual std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const PURE;
};

/**
Expand Down Expand Up @@ -204,6 +210,21 @@ class RetryState {
virtual RetryStatus shouldRetry(const Http::HeaderMap* response_headers,
const absl::optional<Http::StreamResetReason>& reset_reason,
DoRetryCallback callback) PURE;

/**
* Called when a host was attempted but the request failed and is eligible for another retry.
* Should be used to update whatever internal state depends on previously attempted hosts.
* @param host the previously attempted host.
*/
virtual void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* Determine whether host selection should be reattempted. Applies to host selection during
* retries, and is used to provide configurable host selection for retries.
* @param host the host under consideration
* @return whether host selection should be reattempted
*/
virtual bool shouldSelectAnotherHost(const Upstream::Host& host) PURE;
};

typedef std::unique_ptr<RetryState> RetryStatePtr;
Expand Down
12 changes: 9 additions & 3 deletions include/envoy/upstream/retry.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RetryPriority {
*
* @param attempted_host the host that was previously attempted.
*/
virtual void onHostAttempted(HostSharedPtr attempted_host) PURE;
virtual void onHostAttempted(HostDescriptionConstSharedPtr attempted_host) PURE;
};

typedef std::shared_ptr<RetryPriority> RetryPrioritySharedPtr;
Expand Down Expand Up @@ -67,7 +67,7 @@ class RetryHostPredicate {
*
* @param attempted_host the host that was previously attempted.
*/
virtual void onHostAttempted(HostSharedPtr attempted_host) PURE;
virtual void onHostAttempted(HostDescriptionConstSharedPtr attempted_host) PURE;
};

typedef std::shared_ptr<RetryHostPredicate> RetryHostPredicateSharedPtr;
Expand Down Expand Up @@ -115,7 +115,13 @@ class RetryHostPredicateFactory {
public:
virtual ~RetryHostPredicateFactory() {}

virtual void createHostPredicate(RetryHostPredicateFactoryCallbacks& callbacks) PURE;
virtual void createHostPredicate(RetryHostPredicateFactoryCallbacks& callbacks,
const ProtobufWkt::Struct& config) PURE;

/**
* @return name name of this factory.
*/
virtual std::string name() PURE;
};

} // namespace Upstream
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
std::chrono::milliseconds perTryTimeout() const override {
return std::chrono::milliseconds(0);
}
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return {};
}
uint32_t numRetries() const override { return 0; }
uint32_t retryOn() const override { return 0; }
};
Expand Down
6 changes: 6 additions & 0 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ RetryPolicyImpl::RetryPolicyImpl(const envoy::api::v2::route::RouteAction& confi
num_retries_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.retry_policy(), num_retries, 1);
retry_on_ = RetryStateImpl::parseRetryOn(config.retry_policy().retry_on());
retry_on_ |= RetryStateImpl::parseRetryGrpcOn(config.retry_policy().retry_on());

for (const auto& host_predicate : config.retry_policy().retry_host_predicate()) {
Registry::FactoryRegistry<Upstream::RetryHostPredicateFactory>::getFactory(
host_predicate.name())
->createHostPredicate(*this, host_predicate.config());
}
}

CorsPolicyImpl::CorsPolicyImpl(const envoy::api::v2::route::CorsPolicy& config) {
Expand Down
11 changes: 10 additions & 1 deletion source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,28 @@ typedef std::shared_ptr<VirtualHostImpl> VirtualHostSharedPtr;
/**
* Implementation of RetryPolicy that reads from the proto route config.
*/
class RetryPolicyImpl : public RetryPolicy {
class RetryPolicyImpl : public RetryPolicy, Upstream::RetryHostPredicateFactoryCallbacks {
public:
RetryPolicyImpl(const envoy::api::v2::route::RouteAction& config);

// Router::RetryPolicy
std::chrono::milliseconds perTryTimeout() const override { return per_try_timeout_; }
uint32_t numRetries() const override { return num_retries_; }
uint32_t retryOn() const override { return retry_on_; }
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return retry_host_predicates_;
}

// Upstream::RetryHostPredicateFactoryCallbacks
void addHostPredicate(Upstream::RetryHostPredicateSharedPtr predicate) override {
retry_host_predicates_.emplace_back(predicate);
}

private:
std::chrono::milliseconds per_try_timeout_{0};
uint32_t num_retries_{};
uint32_t retry_on_{};
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap&
Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher,
Upstream::ResourcePriority priority)
: cluster_(cluster), runtime_(runtime), random_(random), dispatcher_(dispatcher),
priority_(priority) {
priority_(priority), retry_host_predicates_(route_policy.retryHostPredicates()) {

if (request_headers.EnvoyRetryOn()) {
retry_on_ = parseRetryOn(request_headers.EnvoyRetryOn()->value().c_str());
Expand Down
11 changes: 11 additions & 0 deletions source/common/router/retry_state_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class RetryStateImpl : public RetryState {
RetryStatus shouldRetry(const Http::HeaderMap* response_headers,
const absl::optional<Http::StreamResetReason>& reset_reason,
DoRetryCallback callback) override;
bool shouldSelectAnotherHost(const Upstream::Host& host) override {
return std::any_of(
retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { return predicate->shouldSelectAnotherHost(host); });
}

void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) override {
std::for_each(retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { predicate->onHostAttempted(host); });
}

private:
RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap& request_headers,
Expand All @@ -61,6 +71,7 @@ class RetryStateImpl : public RetryState {
Event::TimerPtr retry_timer_;
Upstream::ResourcePriority priority_;
BackOffStrategyPtr backoff_strategy_;
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
};

} // namespace Router
Expand Down
7 changes: 7 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ void Filter::onUpstreamReset(UpstreamResetType type,

// We don't retry on a global timeout or if we already started the response.
if (type != UpstreamResetType::GlobalTimeout && !downstream_response_started_ && retry_state_) {
// Notify retry modifiers about the attempted host.
retry_state_->onHostAttempted(upstream_host);

RetryStatus retry_status =
retry_state_->shouldRetry(nullptr, reset_reason, [this]() -> void { doRetry(); });
if (retry_status == RetryStatus::Yes && setupRetry(true)) {
Expand Down Expand Up @@ -589,6 +592,9 @@ void Filter::onUpstreamHeaders(const uint64_t response_code, Http::HeaderMapPtr&
}

if (retry_state_) {
// Notify retry modifiers about the attempted host.
retry_state_->onHostAttempted(upstream_request_->upstream_host_);

RetryStatus retry_status = retry_state_->shouldRetry(
headers.get(), absl::optional<Http::StreamResetReason>(), [this]() -> void { doRetry(); });
// Capture upstream_host since setupRetry() in the following line will clear
Expand Down Expand Up @@ -744,6 +750,7 @@ bool Filter::setupRetry(bool end_stream) {
}

void Filter::doRetry() {
is_retry_ = true;
Http::ConnectionPool::Instance* conn_pool = getConnPool();
if (!conn_pool) {
sendNoHealthyUpstreamResponse();
Expand Down
14 changes: 13 additions & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
public:
Filter(FilterConfig& config)
: config_(config), downstream_response_started_(false), downstream_end_stream_(false),
do_shadowing_(false) {}
do_shadowing_(false), is_retry_(false) {}

~Filter();

Expand Down Expand Up @@ -204,6 +204,17 @@ class Filter : Logger::Loggable<Logger::Id::router>,
}
const Http::HeaderMap* downstreamHeaders() const override { return downstream_headers_; }

bool shouldSelectAnotherHost(const Upstream::Host& host) override {
// We only care about host selection when performing a retry, at which point we consult the
// RetryState to see if we're configured to avoid certain hosts during retries.
if (!is_retry_) {
return false;
}

ASSERT(retry_state_);
return retry_state_->shouldSelectAnotherHost(host);
}

/**
* Set a computed cookie to be sent with the downstream headers.
* @param key supplies the size of the cookie
Expand Down Expand Up @@ -377,6 +388,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
bool downstream_response_started_ : 1;
bool downstream_end_stream_ : 1;
bool do_shadowing_ : 1;
bool is_retry_ : 1;
};

class ProdFilter : public Filter {
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class LoadBalancerContextBase : public LoadBalancerContext {

bool shouldSelectAnotherHost(const Host&) override { return false; }

uint32_t hostSelectionRetryCount() const override { return 0; }
uint32_t hostSelectionRetryCount() const override { return 1; }
};

/**
Expand Down
2 changes: 1 addition & 1 deletion test/common/router/retry_state_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RouterRetryStateImplTest : public testing::Test {
EXPECT_CALL(*retry_timer_, enableTimer(_));
}

TestRetryPolicy policy_;
NiceMock<TestRetryPolicy> policy_;
NiceMock<Upstream::MockClusterInfo> cluster_;
NiceMock<Runtime::MockLoader> runtime_;
NiceMock<Runtime::MockRandomGenerator> random_;
Expand Down
71 changes: 71 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class TestFilter : public Filter {
Upstream::ResourcePriority) override {
EXPECT_EQ(nullptr, retry_state_);
retry_state_ = new NiceMock<MockRetryState>();
if (reject_all_hosts_) {
// Set up RetryState to always reject the host
ON_CALL(*retry_state_, shouldSelectAnotherHost(_)).WillByDefault(Return(true));
}
return RetryStatePtr{retry_state_};
}

Expand All @@ -63,6 +67,7 @@ class TestFilter : public Filter {

NiceMock<Network::MockConnection> downstream_connection_;
MockRetryState* retry_state_{};
bool reject_all_hosts_;
};

class RouterTestBase : public testing::Test {
Expand Down Expand Up @@ -1692,6 +1697,72 @@ TEST_F(RouterTest, RetryUpstreamGrpcCancelled) {
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

// Verifies that the initial request accepts any host, but during retries
// RetryPolicy will be consulted.
TEST_F(RouterTest, RetryRespectsRetryHostPredicate) {
router_.reject_all_hosts_ = true;

NiceMock<Http::MockStreamEncoder> encoder1;
Http::StreamDecoder* response_decoder = nullptr;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

NiceMock<Upstream::MockHost> host;
// The router should accept any host at this point, since we're not in a retry.
EXPECT_FALSE(router_.shouldSelectAnotherHost(host));

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
router_.decodeTrailers(trailers);

// 5xx response.
router_.retry_state_->expectRetry();
Http::HeaderMapPtr response_headers1(new Http::TestHeaderMapImpl{{":status", "503"}});
EXPECT_CALL(encoder1.stream_, resetStream(Http::StreamResetReason::LocalReset));
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(503));
response_decoder->decodeHeaders(std::move(response_headers1), false);
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));

// We expect the 5xx response to kick off a new request.
NiceMock<Http::MockStreamEncoder> encoder2;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_);
return nullptr;
}));
ON_CALL(callbacks_, decodingBuffer()).WillByDefault(Return(body_data.get()));
EXPECT_CALL(encoder2, encodeHeaders(_, false));
EXPECT_CALL(encoder2, encodeData(_, false));
EXPECT_CALL(encoder2, encodeTrailers(_));
router_.retry_state_->callback_();

// Now that we're triggered a retry, we should see the router reject hosts.
EXPECT_TRUE(router_.shouldSelectAnotherHost(host));

// Normal response.
EXPECT_CALL(*router_.retry_state_, shouldRetry(_, _, _)).WillOnce(Return(RetryStatus::No));
EXPECT_CALL(cm_.conn_pool_.host_->health_checker_, setUnhealthy()).Times(0);
Http::HeaderMapPtr response_headers2(new Http::TestHeaderMapImpl{{":status", "200"}});
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(200));
response_decoder->decodeHeaders(std::move(response_headers2), true);
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

TEST_F(RouterTest, Shadow) {
callbacks_.route_->route_entry_.shadow_policy_.cluster_ = "foo";
callbacks_.route_->route_entry_.shadow_policy_.runtime_key_ = "bar";
Expand Down
4 changes: 3 additions & 1 deletion test/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ void ConfigHelper::setConnectTimeout(std::chrono::milliseconds timeout) {
void ConfigHelper::addRoute(const std::string& domains, const std::string& prefix,
const std::string& cluster, bool validate_clusters,
envoy::api::v2::route::RouteAction::ClusterNotFoundResponseCode code,
envoy::api::v2::route::VirtualHost::TlsRequirementType type) {
envoy::api::v2::route::VirtualHost::TlsRequirementType type,
envoy::api::v2::route::RouteAction::RetryPolicy retry_policy) {
RELEASE_ASSERT(!finalized_, "");
envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager hcm_config;
loadHttpConnectionManager(hcm_config);
Expand All @@ -324,6 +325,7 @@ void ConfigHelper::addRoute(const std::string& domains, const std::string& prefi
virtual_host->add_routes()->mutable_match()->set_prefix(prefix);
virtual_host->mutable_routes(0)->mutable_route()->set_cluster(cluster);
virtual_host->mutable_routes(0)->mutable_route()->set_cluster_not_found_response_code(code);
virtual_host->mutable_routes(0)->mutable_route()->mutable_retry_policy()->Swap(&retry_policy);
virtual_host->set_require_tls(type);

storeHttpConnectionManager(hcm_config);
Expand Down
3 changes: 2 additions & 1 deletion test/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class ConfigHelper {
bool validate_clusters,
envoy::api::v2::route::RouteAction::ClusterNotFoundResponseCode code,
envoy::api::v2::route::VirtualHost::TlsRequirementType type =
envoy::api::v2::route::VirtualHost::NONE);
envoy::api::v2::route::VirtualHost::NONE,
envoy::api::v2::route::RouteAction::RetryPolicy retry_policy = {});

// Add an HTTP filter prior to existing filters.
void addFilter(const std::string& filter_yaml);
Expand Down
13 changes: 13 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ envoy_cc_test(
],
)

envoy_cc_test_library(
name = "test_host_predicate_lib",
srcs = [
"test_host_predicate.h",
"test_host_predicate_config.h",
],
deps = [
"//include/envoy/upstream:retry_interface",
],
)

envoy_cc_test_library(
name = "add_trailers_filter_config_lib",
srcs = [
Expand Down Expand Up @@ -205,9 +216,11 @@ envoy_cc_test_library(
deps = [
":add_trailers_filter_config_lib",
":integration_lib",
":test_host_predicate_lib",
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/common/upstream:utility_lib",
"//test/test_common:registry_lib",
],
)

Expand Down
2 changes: 2 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ TEST_P(Http2IntegrationTest, HittingEncoderFilterLimit) { testHittingEncoderFilt

TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { testGrpcRouterNotFound(); }

TEST_P(Http2IntegrationTest, RetryHostPredicateFilter) { testRetryHostPredicateFilter(); }

TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); }

// Send a request with overly large headers, and ensure it results in stream reset.
Expand Down
Loading

0 comments on commit 62eb123

Please sign in to comment.