Skip to content

Commit

Permalink
test: Add non-aggregated CDS-over-gRPC integration test (envoyproxy#5228
Browse files Browse the repository at this point in the history
)

While trying to implement incremental CDS support in Envoy, I found it extremely difficult to work with the integration tests, which I hoped to adapt/copy for incremental. The only test using gRPC was ADS. I decided that a more manageable first step was to add a test for just (non-incremental) CDS over gRPC.

Risk Level: none, just a new test

Signed-off-by: Fred Douglas <[email protected]>
  • Loading branch information
fredlas authored and htuch committed Jan 11, 2019
1 parent 35827dd commit c84604c
Show file tree
Hide file tree
Showing 24 changed files with 555 additions and 202 deletions.
4 changes: 3 additions & 1 deletion docs/root/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ General
-------

The cluster manager has a statistics tree rooted at *cluster_manager.* with the following
statistics. Any ``:`` character in the stats name is replaced with ``_``.
statistics. Any ``:`` character in the stats name is replaced with ``_``. Stats include
all clusters managed by the cluster manager, including both clusters used for data plane
upstreams and control plane xDS clusters.

.. csv-table::
:header: Name, Type, Description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TEST_P(BufferIntegrationTest, RouterRequestAndResponseWithGiantBodyBuffer) {

TEST_P(BufferIntegrationTest, RouterHeaderOnlyRequestAndResponseBuffer) {
config_helper_.addFilter(ConfigHelper::DEFAULT_BUFFER_FILTER);
testRouterHeaderOnlyRequestAndResponse(true);
testRouterHeaderOnlyRequestAndResponse();
}

TEST_P(BufferIntegrationTest, RouterRequestAndResponseWithBodyBuffer) {
Expand Down
45 changes: 43 additions & 2 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ envoy_cc_test(
"//test/config/integration/certs",
],
deps = [
":http_integration_lib",
":xds_integration_test_base",
"//source/common/config:protobuf_link_hacks",
"//source/common/config:resources_lib",
"//source/common/protobuf:utility_lib",
Expand Down Expand Up @@ -63,6 +63,27 @@ envoy_proto_library(
require_py = 1,
)

envoy_cc_test(
name = "cds_integration_test",
srcs = ["cds_integration_test.cc"],
data = [
"//test/config/integration/certs",
],
deps = [
":xds_integration_test_base",
"//source/common/config:protobuf_link_hacks",
"//source/common/config:resources_lib",
"//source/common/protobuf:utility_lib",
"//test/common/grpc:grpc_client_integration_lib",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/server:server_mocks",
"//test/test_common:network_utility_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/api/v2:cds_cc",
"@envoy_api//envoy/api/v2:discovery_cc",
],
)

envoy_cc_test(
name = "eds_integration_test",
srcs = ["eds_integration_test.cc"],
Expand Down Expand Up @@ -601,11 +622,31 @@ envoy_cc_test(
srcs = ["xds_integration_test.cc"],
data = ["//test/config/integration:server_xds_files"],
deps = [
":http_integration_lib",
":xds_integration_test_base",
"//test/test_common:utility_lib",
],
)

envoy_cc_test_library(
name = "xds_integration_test_base",
srcs = [
"xds_integration_test_base.cc",
],
hdrs = [
"xds_integration_test_base.h",
],
deps = [
":http_integration_lib",
":test_host_predicate_lib",
"//include/envoy/event:timer_interface",
"//source/common/common:thread_annotations",
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/common/upstream:utility_lib",
"//test/test_common:registry_lib",
],
)

envoy_cc_test(
name = "xfcc_integration_test",
srcs = [
Expand Down
155 changes: 32 additions & 123 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include "common/ssl/ssl_socket.h"

#include "test/common/grpc/grpc_client_integration.h"
#include "test/integration/http_integration.h"
#include "test/integration/utility.h"
#include "test/integration/xds_integration_test_base.h"
#include "test/mocks/server/mocks.h"
#include "test/test_common/network_utility.h"
#include "test/test_common/simulated_time_system.h"
Expand Down Expand Up @@ -60,50 +60,20 @@ const std::string config = R"EOF(
port_value: 0
)EOF";

class AdsIntegrationBaseTest : public HttpIntegrationTest {
public:
AdsIntegrationBaseTest(Http::CodecClient::Type downstream_protocol,
Network::Address::IpVersion version,
const std::string& config = ConfigHelper::HTTP_PROXY_CONFIG)
: HttpIntegrationTest(downstream_protocol, version, realTime(), config) {}

void createAdsConnection(FakeUpstream& upstream) {
ads_upstream_ = &upstream;
AssertionResult result = ads_upstream_->waitForHttpConnection(*dispatcher_, ads_connection_);
RELEASE_ASSERT(result, result.message());
}

void cleanUpAdsConnection() {
ASSERT(ads_upstream_ != nullptr);

// Don't ASSERT fail if an ADS reconnect ends up unparented.
ads_upstream_->set_allow_unexpected_disconnects(true);
AssertionResult result = ads_connection_->close();
RELEASE_ASSERT(result, result.message());
result = ads_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
ads_connection_.reset();
}

protected:
FakeHttpConnectionPtr ads_connection_;
FakeUpstream* ads_upstream_{};
};

class AdsIntegrationTest : public AdsIntegrationBaseTest,
class AdsIntegrationTest : public XdsIntegrationTestBase,
public Grpc::GrpcClientIntegrationParamTest {
public:
AdsIntegrationTest()
: AdsIntegrationBaseTest(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}
: XdsIntegrationTestBase(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}

void TearDown() override {
cleanUpAdsConnection();
cleanUpXdsConnection();
test_server_.reset();
fake_upstreams_.clear();
}

void createUpstreams() override {
AdsIntegrationBaseTest::createUpstreams();
XdsIntegrationTestBase::createUpstreams();
fake_upstreams_.emplace_back(new FakeUpstream(
createUpstreamSslContext(), 0, FakeHttpConnection::Type::HTTP2, version_, timeSystem()));
}
Expand All @@ -124,60 +94,6 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest,
std::move(cfg), context_manager_, *upstream_stats_store, std::vector<std::string>{});
}

AssertionResult
compareDiscoveryRequest(const std::string& expected_type_url, const std::string& expected_version,
const std::vector<std::string>& expected_resource_names,
const Protobuf::int32 expected_error_code = Grpc::Status::GrpcStatus::Ok,
const std::string& expected_error_message = "") {
envoy::api::v2::DiscoveryRequest discovery_request;
VERIFY_ASSERTION(ads_stream_->waitForGrpcMessage(*dispatcher_, discovery_request));

EXPECT_TRUE(discovery_request.has_node());
EXPECT_FALSE(discovery_request.node().id().empty());
EXPECT_FALSE(discovery_request.node().cluster().empty());

// TODO(PiotrSikora): Remove this hack once fixed internally.
if (!(expected_type_url == discovery_request.type_url())) {
return AssertionFailure() << fmt::format("type_url {} does not match expected {}",
discovery_request.type_url(), expected_type_url);
}
if (!(expected_error_code == discovery_request.error_detail().code())) {
return AssertionFailure() << fmt::format("error_code {} does not match expected {}",
discovery_request.error_detail().code(),
expected_error_code);
}
EXPECT_TRUE(
IsSubstring("", "", expected_error_message, discovery_request.error_detail().message()));
const std::vector<std::string> resource_names(discovery_request.resource_names().cbegin(),
discovery_request.resource_names().cend());
if (expected_resource_names != resource_names) {
return AssertionFailure() << fmt::format(
"resources {} do not match expected {} in {}",
fmt::join(resource_names.begin(), resource_names.end(), ","),
fmt::join(expected_resource_names.begin(), expected_resource_names.end(), ","),
discovery_request.DebugString());
}
// TODO(PiotrSikora): Remove this hack once fixed internally.
if (!(expected_version == discovery_request.version_info())) {
return AssertionFailure() << fmt::format("version {} does not match expected {} in {}",
discovery_request.version_info(), expected_version,
discovery_request.DebugString());
}
return AssertionSuccess();
}

template <class T>
void sendDiscoveryResponse(const std::string& type_url, const std::vector<T>& messages,
const std::string& version) {
envoy::api::v2::DiscoveryResponse discovery_response;
discovery_response.set_version_info(version);
discovery_response.set_type_url(type_url);
for (const auto& message : messages) {
discovery_response.add_resources()->PackFrom(message);
}
ads_stream_->sendGrpcMessage(discovery_response);
}

envoy::api::v2::Cluster buildCluster(const std::string& name) {
return TestUtility::parseYaml<envoy::api::v2::Cluster>(fmt::format(R"EOF(
name: {}
Expand Down Expand Up @@ -245,9 +161,8 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest,

void makeSingleRequest() {
registerTestServerPorts({"http"});
testRouterHeaderOnlyRequestAndResponse(true);
testRouterHeaderOnlyRequestAndResponse();
cleanupUpstreamAndDownstream();
fake_upstream_connection_ = nullptr;
}

void initialize() override { initializeAds(false); }
Expand Down Expand Up @@ -278,12 +193,12 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest,
}
});
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
AdsIntegrationBaseTest::initialize();
if (ads_stream_ == nullptr) {
createAdsConnection(*(fake_upstreams_[1]));
AssertionResult result = ads_connection_->waitForNewStream(*dispatcher_, ads_stream_);
XdsIntegrationTestBase::initialize();
if (xds_stream_ == nullptr) {
createXdsConnection(*(fake_upstreams_[1]));
AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_);
RELEASE_ASSERT(result, result.message());
ads_stream_->startGrpcStream();
xds_stream_->startGrpcStream();
}
}

Expand Down Expand Up @@ -410,9 +325,7 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest,
return dynamic_cast<const envoy::admin::v2alpha::RoutesConfigDump&>(*message_ptr);
}

testing::NiceMock<Server::Configuration::MockTransportSocketFactoryContext> factory_context_;
Ssl::ContextManagerImpl context_manager_{timeSystem()};
FakeStreamPtr ads_stream_;
};

INSTANTIATE_TEST_CASE_P(IpVersionsClientType, AdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS);
Expand Down Expand Up @@ -653,20 +566,20 @@ TEST_P(AdsIntegrationTest, RdsAfterLdsWithRdsChange) {
makeSingleRequest();
}

class AdsFailIntegrationTest : public AdsIntegrationBaseTest,
class AdsFailIntegrationTest : public XdsIntegrationTestBase,
public Grpc::GrpcClientIntegrationParamTest {
public:
AdsFailIntegrationTest()
: AdsIntegrationBaseTest(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}
: XdsIntegrationTestBase(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}

void TearDown() override {
cleanUpAdsConnection();
cleanUpXdsConnection();
test_server_.reset();
fake_upstreams_.clear();
}

void createUpstreams() override {
AdsIntegrationBaseTest::createUpstreams();
XdsIntegrationTestBase::createUpstreams();
fake_upstreams_.emplace_back(
new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, timeSystem()));
}
Expand All @@ -681,10 +594,8 @@ class AdsFailIntegrationTest : public AdsIntegrationBaseTest,
ads_cluster->set_name("ads_cluster");
});
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
AdsIntegrationBaseTest::initialize();
XdsIntegrationTestBase::initialize();
}

FakeStreamPtr ads_stream_;
};

INSTANTIATE_TEST_CASE_P(IpVersionsClientType, AdsFailIntegrationTest,
Expand All @@ -693,26 +604,26 @@ INSTANTIATE_TEST_CASE_P(IpVersionsClientType, AdsFailIntegrationTest,
// Validate that we don't crash on failed ADS stream.
TEST_P(AdsFailIntegrationTest, ConnectDisconnect) {
initialize();
createAdsConnection(*fake_upstreams_[1]);
ASSERT_TRUE(ads_connection_->waitForNewStream(*dispatcher_, ads_stream_));
ads_stream_->startGrpcStream();
ads_stream_->finishGrpcStream(Grpc::Status::Internal);
createXdsConnection(*fake_upstreams_[1]);
ASSERT_TRUE(xds_connection_->waitForNewStream(*dispatcher_, xds_stream_));
xds_stream_->startGrpcStream();
xds_stream_->finishGrpcStream(Grpc::Status::Internal);
}

class AdsConfigIntegrationTest : public AdsIntegrationBaseTest,
class AdsConfigIntegrationTest : public XdsIntegrationTestBase,
public Grpc::GrpcClientIntegrationParamTest {
public:
AdsConfigIntegrationTest()
: AdsIntegrationBaseTest(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}
: XdsIntegrationTestBase(Http::CodecClient::Type::HTTP2, ipVersion(), config) {}

void TearDown() override {
cleanUpAdsConnection();
cleanUpXdsConnection();
test_server_.reset();
fake_upstreams_.clear();
}

void createUpstreams() override {
AdsIntegrationBaseTest::createUpstreams();
XdsIntegrationTestBase::createUpstreams();
fake_upstreams_.emplace_back(
new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, timeSystem()));
}
Expand All @@ -735,10 +646,8 @@ class AdsConfigIntegrationTest : public AdsIntegrationBaseTest,
eds_config->mutable_ads();
});
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
AdsIntegrationBaseTest::initialize();
XdsIntegrationTestBase::initialize();
}

FakeStreamPtr ads_stream_;
};

INSTANTIATE_TEST_CASE_P(IpVersionsClientType, AdsConfigIntegrationTest,
Expand All @@ -747,10 +656,10 @@ INSTANTIATE_TEST_CASE_P(IpVersionsClientType, AdsConfigIntegrationTest,
// This is s regression validating that we don't crash on EDS static Cluster that uses ADS.
TEST_P(AdsConfigIntegrationTest, EdsClusterWithAdsConfigSource) {
initialize();
createAdsConnection(*fake_upstreams_[1]);
ASSERT_TRUE(ads_connection_->waitForNewStream(*dispatcher_, ads_stream_));
ads_stream_->startGrpcStream();
ads_stream_->finishGrpcStream(Grpc::Status::Ok);
createXdsConnection(*fake_upstreams_[1]);
ASSERT_TRUE(xds_connection_->waitForNewStream(*dispatcher_, xds_stream_));
xds_stream_->startGrpcStream();
xds_stream_->finishGrpcStream(Grpc::Status::Ok);
}

// Validates that the initial xDS request batches all resources referred to in static config
Expand All @@ -768,9 +677,9 @@ TEST_P(AdsIntegrationTest, XdsBatching) {
});

pre_worker_start_test_steps_ = [this]() {
createAdsConnection(*fake_upstreams_.back());
ASSERT_TRUE(ads_connection_->waitForNewStream(*dispatcher_, ads_stream_));
ads_stream_->startGrpcStream();
createXdsConnection(*fake_upstreams_.back());
ASSERT_TRUE(xds_connection_->waitForNewStream(*dispatcher_, xds_stream_));
xds_stream_->startGrpcStream();

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "",
{"eds_cluster2", "eds_cluster"}));
Expand Down
Loading

0 comments on commit c84604c

Please sign in to comment.