Skip to content

Commit

Permalink
fixup: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Aug 11, 2023
1 parent d41c023 commit a367cd7
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 73 deletions.
115 changes: 55 additions & 60 deletions test/cpp/interop/pre_stop_hook_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,103 +76,98 @@ class HookServiceImpl final : public HookService::CallbackService {
std::vector<ServerUnaryReactor*> pending_requests_ ABSL_GUARDED_BY(&mu_);
};

class ServerHolder {
enum class State { kNew, kWaiting, kDone, kShuttingDown };

std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
ServerBuilder builder;
builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
grpc::InsecureServerCredentials());
builder.RegisterService(service);
return builder.BuildAndStart();
}

class PreStopHookServer {
public:
enum class State { kNew, kWaiting, kDone, kShuttingDown };

explicit ServerHolder(int port) {
ServerBuilder builder;
builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
grpc::InsecureServerCredentials());
builder.RegisterService(&hook_service_);
server_ = builder.BuildAndStart();
}
explicit PreStopHookServer(std::unique_ptr<Server> server)
: server_(std::move(server)) {}

bool WaitForState(State state, absl::Duration timeout) {
auto deadline = absl::Now() + timeout;
grpc_core::MutexLock lock(&mu_);
while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
}
return state_ == state;
void Shutdown() {
SetState(State::kShuttingDown);
server_->Shutdown();
}

void Shutdown() { server_->Shutdown(); }

void SetReturnStatus(Status status) { hook_service_.SetReturnStatus(status); }

State state() {
State GetState() {
grpc_core::MutexLock lock(&mu_);
return state_;
}

bool ExpectRequests(size_t expected_requests_count, size_t timeout_s) {
return hook_service_.ExpectRequests(expected_requests_count, timeout_s);
void SetState(State state) {
grpc_core::MutexLock lock(&mu_);
state_ = state;
condition_.SignalAll();
}

static void RunServer(std::shared_ptr<ServerHolder> server) {
{
grpc_core::MutexLock lock(&server->mu_);
server->state_ = State::kWaiting;
server->condition_.SignalAll();
bool WaitForState(State state, const absl::Duration& timeout) {
grpc_core::MutexLock lock(&mu_);
auto deadline = absl::Now() + timeout;
while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
}
return state_ == state;
}

static void ServerThread(std::shared_ptr<PreStopHookServer> server) {
server->SetState(State::kWaiting);
server->server_->Wait();
{
grpc_core::MutexLock lock(&server->mu_);
server->state_ = State::kShuttingDown;
server->condition_.SignalAll();
}
server->SetState(State::kDone);
}

private:
grpc_core::Mutex mu_;
grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
std::unique_ptr<Server> server_;
HookServiceImpl hook_service_;
};

} // namespace

class PreStopHookServer {
class ServerHolder : private std::enable_shared_from_this<ServerHolder> {
public:
explicit PreStopHookServer(int port, int timeout_s = 15)
: server_(std::make_shared<ServerHolder>(port)),
thread_(ServerHolder::RunServer, server_) {
server_->WaitForState(ServerHolder::State::kWaiting,
absl::Seconds(timeout_s));
explicit ServerHolder(int port, const absl::Duration& timeout)
: server_(std::make_shared<PreStopHookServer>(
BuildHookServer(&hook_service_, port))),
server_thread_(PreStopHookServer::ServerThread, server_) {
server_->WaitForState(State::kWaiting, timeout);
}

~PreStopHookServer() {
~ServerHolder() {
server_->Shutdown();
thread_.join();
server_thread_.join();
}

Status startup_status() const {
return server_->state() == ServerHolder::State::kWaiting
? Status::OK
: Status(StatusCode::DEADLINE_EXCEEDED,
"Server have not started");
}
void SetReturnStatus(Status status) { hook_service_.SetReturnStatus(status); }

void SetReturnStatus(Status status) { server_->SetReturnStatus(status); }
State state() { return server_->GetState(); }

bool ExpectRequests(size_t expected_requests_count, size_t timeout_s) {
return server_->ExpectRequests(expected_requests_count, timeout_s);
return hook_service_.ExpectRequests(expected_requests_count, timeout_s);
}

private:
std::shared_ptr<ServerHolder> server_;
std::thread thread_;
HookServiceImpl hook_service_;
std::shared_ptr<PreStopHookServer> server_;
std::thread server_thread_;
};

Status PreStopHookServerManager::Start(int port) {
Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
if (server_) {
return Status(StatusCode::ALREADY_EXISTS,
"Pre hook server is already running");
}
server_ = std::unique_ptr<PreStopHookServer,
PreStopHookServerManager::DeleteServer>(
new PreStopHookServer(port), PreStopHookServerManager::DeleteServer());
return server_->startup_status();
server_ = std::unique_ptr<ServerHolder, ServerHolderDeleter>(
new ServerHolder(port, absl::Seconds(timeout_s)), ServerHolderDeleter());
return server_->state() == State::kWaiting
? Status::OK
: Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
}

Status PreStopHookServerManager::Stop() {
Expand All @@ -193,9 +188,9 @@ bool PreStopHookServerManager::ExpectRequests(size_t expected_requests_count,
return server_->ExpectRequests(expected_requests_count, timeout_s);
}

void PreStopHookServerManager::DeleteServer::operator()(
PreStopHookServer* server) {
delete server;
void PreStopHookServerManager::ServerHolderDeleter::operator()(
ServerHolder* holder) {
delete holder;
}

} // namespace testing
Expand Down
16 changes: 11 additions & 5 deletions test/cpp/interop/pre_stop_hook_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,36 @@

#include <grpc/support/port_platform.h>

#include <thread>

#include <grpcpp/server.h>

#include "src/core/lib/config/core_configuration.h"

namespace grpc {
namespace testing {

class PreStopHookServer;
class ServerHolder;

class PreStopHookServerManager {
public:
Status Start(int port);
~PreStopHookServerManager() { Stop(); }

Status Start(int port, size_t timeout_s);
Status Stop();
void Return(StatusCode code, absl::string_view description);
// Suspends the thread until there are pending requests. Returns false
// if necessary number of requests have not been received before the timeout.
bool ExpectRequests(size_t expected_requests_count, size_t timeout_s = 15);

private:
struct DeleteServer {
void operator()(PreStopHookServer* server);
struct ServerHolderDeleter {
void operator()(ServerHolder* server);
};

// Custom deleter so we don't have to include PreStopHookServer in this header
std::unique_ptr<PreStopHookServer, DeleteServer> server_;
// std::unique_ptr<PreStopHookServer, DeleteServer> server_;
std::unique_ptr<ServerHolder, ServerHolderDeleter> server_;
};

} // namespace testing
Expand Down
15 changes: 8 additions & 7 deletions test/cpp/interop/pre_stop_hook_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct CallInfo {
TEST(PreStopHookServer, StartDoRequestStop) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port);
Status start_status = server.Start(port, 15);
ASSERT_TRUE(start_status.ok()) << start_status.error_message();
auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
InsecureChannelCredentials());
Expand All @@ -71,17 +71,17 @@ TEST(PreStopHookServer, StartDoRequestStop) {
TEST(PreStopHookServer, StartedWhileRunning) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status status = server.Start(port);
Status status = server.Start(port, 15);
ASSERT_TRUE(status.ok()) << status.error_message();
status = server.Start(port);
status = server.Start(port, 15);
ASSERT_EQ(status.error_code(), StatusCode::ALREADY_EXISTS)
<< status.error_message();
}

TEST(PreStopHookServer, ClosingWhilePending) {
TEST(PreStopHookServer, DISABLED_ClosingWhilePending) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port);
Status start_status = server.Start(port, 15);
ASSERT_TRUE(start_status.ok()) << start_status.error_message();
auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
InsecureChannelCredentials());
Expand All @@ -92,6 +92,7 @@ TEST(PreStopHookServer, ClosingWhilePending) {
auto call = stub.PrepareAsyncHook(&info.context, info.request, &cq);
call->StartCall();
call->Finish(&info.response, &info.status, &info);
ASSERT_EQ(server.ExpectRequests(1), 1);
server.Stop();
void* returned_tag;
bool ok = false;
Expand All @@ -105,7 +106,7 @@ TEST(PreStopHookServer, ClosingWhilePending) {
TEST(PreStopHookServer, MultiplePending) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port);
Status start_status = server.Start(port, 15);
ASSERT_TRUE(start_status.ok()) << start_status.error_message();
auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
InsecureChannelCredentials());
Expand Down Expand Up @@ -144,7 +145,7 @@ TEST(PreStopHookServer, StoppingNotStarted) {
TEST(PreStopHookServer, ReturnBeforeSend) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port);
Status start_status = server.Start(port, 15);
server.Return(StatusCode::INTERNAL, "Just a test");
ASSERT_TRUE(start_status.ok()) << start_status.error_message();
auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
Expand Down
2 changes: 1 addition & 1 deletion test/cpp/interop/xds_interop_server_lib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service {
HookResponse* /* response */) override {
switch (request->command()) {
case HookRequestCommand::START:
return pre_stop_hook_server_->Start(request->server_port());
return pre_stop_hook_server_->Start(request->server_port(), 30 /* s */);
case HookRequestCommand::STOP:
return pre_stop_hook_server_->Stop();
case HookRequestCommand::RETURN:
Expand Down

0 comments on commit a367cd7

Please sign in to comment.