-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
347 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Copyright 2024 the gRPC authors. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
licenses(["notice"]) | ||
|
||
cc_binary( | ||
name = "client_flow_control_client", | ||
srcs = ["client_flow_control_client.cc"], | ||
defines = ["BAZEL_BUILD"], | ||
deps = [ | ||
"//:grpc++", | ||
"//:grpc++_reflection", | ||
"//examples/protos:helloworld_cc_grpc", | ||
"@com_google_absl//absl/flags:flag", | ||
"@com_google_absl//absl/flags:parse", | ||
], | ||
) | ||
|
||
cc_binary( | ||
name = "client_flow_control_server", | ||
srcs = ["client_flow_control_server.cc"], | ||
defines = ["BAZEL_BUILD"], | ||
deps = [ | ||
"//:grpc++", | ||
"//:grpc++_reflection", | ||
"//examples/protos:helloworld_cc_grpc", | ||
"@com_google_absl//absl/flags:flag", | ||
"@com_google_absl//absl/flags:parse", | ||
"@com_google_absl//absl/strings:str_format", | ||
], | ||
) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
126 changes: 126 additions & 0 deletions
126
examples/cpp/flow_control/client_flow_control_client.cc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* | ||
* Copyright 2024 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
#include <cstddef> | ||
#include <ostream> | ||
#include <string> | ||
|
||
#include "absl/flags/flag.h" | ||
#include "absl/flags/parse.h" | ||
|
||
#include <grpc/grpc.h> | ||
#include <grpcpp/ext/proto_server_reflection_plugin.h> | ||
#include <grpcpp/grpcpp.h> | ||
|
||
#ifdef BAZEL_BUILD | ||
#include "examples/protos/helloworld.grpc.pb.h" | ||
#else | ||
#include "helloworld.grpc.pb.h" | ||
#endif | ||
|
||
ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); | ||
|
||
using grpc::CallbackServerContext; | ||
using grpc::Channel; | ||
using grpc::ClientContext; | ||
using grpc::Server; | ||
using grpc::ServerBuilder; | ||
using grpc::ServerUnaryReactor; | ||
using grpc::Status; | ||
using helloworld::Greeter; | ||
using helloworld::HelloReply; | ||
using helloworld::HelloRequest; | ||
|
||
namespace { | ||
|
||
// Sends requests as quickly as possible and times how long it takes to perform | ||
// the write operation. | ||
class GreeterClientReactor final | ||
: public grpc::ClientBidiReactor<helloworld::HelloRequest, | ||
helloworld::HelloReply> { | ||
public: | ||
explicit GreeterClientReactor(int reqs, size_t req_size) : reqs_(reqs) { | ||
req_.set_name(std::string(req_size, '*')); | ||
} | ||
|
||
void Start() { | ||
absl::MutexLock lock(&mu_); | ||
StartCall(); | ||
Write(); | ||
} | ||
|
||
~GreeterClientReactor() override { | ||
absl::MutexLock lock(&mu_); | ||
mu_.Await(absl::Condition(+[](bool* done) { return *done; }, &done_)); | ||
} | ||
|
||
void OnWriteDone(bool ok) override { | ||
absl::MutexLock lock(&mu_); | ||
std::cout << "Writing took " << absl::Now() - *time_ << std::endl; | ||
time_ = absl::nullopt; | ||
if (ok) { | ||
Write(); | ||
} | ||
} | ||
|
||
void OnDone(const grpc::Status& status) override { | ||
if (status.ok()) { | ||
std::cout << "Done\n"; | ||
} else { | ||
std::cout << "Done with error: [" << status.error_code() << "] " | ||
<< status.error_message() << "\n"; | ||
} | ||
absl::MutexLock lock(&mu_); | ||
done_ = true; | ||
} | ||
|
||
private: | ||
void Write() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { | ||
if (reqs_ == 0) { | ||
StartWritesDone(); | ||
return; | ||
} | ||
--reqs_; | ||
StartWrite(&req_); | ||
time_ = absl::Now(); | ||
} | ||
|
||
absl::Mutex mu_; | ||
bool done_ ABSL_GUARDED_BY(&mu_) = false; | ||
HelloRequest req_; | ||
size_t reqs_; | ||
absl::optional<absl::Time> time_ ABSL_GUARDED_BY(mu_); | ||
}; | ||
|
||
} // namespace | ||
|
||
int main(int argc, char** argv) { | ||
absl::ParseCommandLine(argc, argv); | ||
grpc::ChannelArguments channel_arguments; | ||
auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target), | ||
grpc::InsecureChannelCredentials(), | ||
channel_arguments); | ||
auto stub = Greeter::NewStub(channel); | ||
// Send 10 requests with 3Mb payload. This will eventually fill the buffer | ||
// and make | ||
GreeterClientReactor reactor(10, 3 * 1024 * 1024); | ||
grpc::ClientContext context; | ||
stub->async()->SayHelloBidiStream(&context, &reactor); | ||
reactor.Start(); | ||
return 0; | ||
} |
106 changes: 106 additions & 0 deletions
106
examples/cpp/flow_control/client_flow_control_server.cc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* | ||
* Copyright 2021 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
#include <iostream> | ||
#include <string> | ||
|
||
#include "absl/flags/flag.h" | ||
#include "absl/flags/parse.h" | ||
#include "absl/strings/str_format.h" | ||
|
||
#include <grpcpp/ext/proto_server_reflection_plugin.h> | ||
#include <grpcpp/grpcpp.h> | ||
#include <grpcpp/health_check_service_interface.h> | ||
|
||
#ifdef BAZEL_BUILD | ||
#include "examples/protos/helloworld.grpc.pb.h" | ||
#else | ||
#include "helloworld.grpc.pb.h" | ||
#endif | ||
|
||
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); | ||
|
||
namespace { | ||
|
||
// | ||
// Server reactor that is slow to read incoming messages, causing the buffers | ||
// to fill. | ||
// | ||
class SlowReadingBidiReactor final | ||
: public grpc::ServerBidiReactor<helloworld::HelloRequest, | ||
helloworld::HelloReply> { | ||
public: | ||
SlowReadingBidiReactor() { StartRead(&req_); } | ||
|
||
void OnReadDone(bool ok) override { | ||
std::cout << "Recieved request with " << req_.name().length() | ||
<< " bytes name\n"; | ||
if (!ok) { | ||
Finish(grpc::Status::OK); | ||
return; | ||
} | ||
sleep(1); | ||
StartRead(&req_); | ||
} | ||
|
||
void OnDone() override { | ||
std::cout << "Done\n"; | ||
delete this; | ||
} | ||
|
||
private: | ||
absl::Mutex mu_; | ||
helloworld::HelloRequest req_; | ||
}; | ||
|
||
// Logic and data behind the server's behavior. | ||
class GreeterServiceImpl final : public helloworld::Greeter::CallbackService { | ||
grpc::ServerBidiReactor<helloworld::HelloRequest, helloworld::HelloReply>* | ||
SayHelloBidiStream(grpc::CallbackServerContext* /* context */) override { | ||
return new SlowReadingBidiReactor(); | ||
} | ||
}; | ||
|
||
} // namespace | ||
|
||
void RunServer(uint16_t port) { | ||
std::string server_address = absl::StrFormat("0.0.0.0:%d", port); | ||
GreeterServiceImpl service; | ||
|
||
grpc::EnableDefaultHealthCheckService(true); | ||
grpc::reflection::InitProtoReflectionServerBuilderPlugin(); | ||
grpc::ServerBuilder builder; | ||
// Listen on the given address without any authentication mechanism. | ||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); | ||
// Register "service" as the instance through which we'll communicate with | ||
// clients. In this case it corresponds to an *synchronous* service. | ||
builder.RegisterService(&service); | ||
// Finally assemble the server. | ||
auto server = builder.BuildAndStart(); | ||
std::cout << "Server listening on " << server_address << std::endl; | ||
|
||
// Wait for the server to shutdown. Note that some other thread must be | ||
// responsible for shutting down the server for this call to ever return. | ||
server->Wait(); | ||
} | ||
|
||
int main(int argc, char** argv) { | ||
absl::ParseCommandLine(argc, argv); | ||
RunServer(absl::GetFlag(FLAGS_port)); | ||
return 0; | ||
} |