Skip to content

Commit 6667c56

Browse files
authored
[coro_rpc] client support store response attachment data in external buffer (#906)
* [coro_rpc] client support store response attachment data in external buffer * fix ssl * fix
1 parent 3cedfb9 commit 6667c56

File tree

3 files changed

+158
-72
lines changed

3 files changed

+158
-72
lines changed

include/ylt/coro_rpc/impl/coro_rpc_client.hpp

Lines changed: 128 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ class client_pool;
7373

7474
namespace coro_rpc {
7575

76+
struct request_config_t {
77+
std::optional<std::chrono::milliseconds> request_timeout_duration;
78+
std::string_view request_attachment;
79+
std::span<char> resp_attachment_buf;
80+
};
7681
inline uint64_t get_global_client_id() {
7782
static std::atomic<uint64_t> cid = 0;
7883
return cid.fetch_add(1, std::memory_order::relaxed);
@@ -97,32 +102,42 @@ struct resp_body {
97102
std::string read_buf_;
98103
std::string resp_attachment_buf_;
99104
};
105+
namespace detail {
106+
struct async_rpc_result_base {
107+
private:
108+
resp_body buffer_;
109+
std::string_view attachment_;
110+
111+
public:
112+
async_rpc_result_base() = default;
113+
async_rpc_result_base(resp_body &&buffer, std::string_view attachment)
114+
: buffer_(std::move(buffer)), attachment_(attachment) {}
115+
std::string_view get_attachment() const noexcept { return attachment_; }
116+
bool is_attachment_in_external_buf() const noexcept {
117+
return buffer_.resp_attachment_buf_.data() == attachment_.data();
118+
}
119+
resp_body release_buffer() { return std::move(buffer_); }
120+
};
121+
} // namespace detail
100122

101123
template <typename T>
102-
struct async_rpc_result_value_t {
124+
struct async_rpc_result_value_t : public detail::async_rpc_result_base {
103125
private:
104126
T result_;
105-
resp_body buffer_;
106127

107128
public:
108-
async_rpc_result_value_t(T &&result, resp_body &&buffer)
109-
: result_(std::move(result)), buffer_(std::move(buffer)) {}
129+
async_rpc_result_value_t(T &&result, resp_body &&buffer,
130+
std::string_view attachment)
131+
: result_(std::move(result)),
132+
async_rpc_result_base(std::move(buffer), attachment) {}
110133
async_rpc_result_value_t(T &&result) : result_(std::move(result)) {}
111134
T &result() noexcept { return result_; }
112135
const T &result() const noexcept { return result_; }
113-
std::string_view get_attachment() const noexcept {
114-
return buffer_.resp_attachment_buf_;
115-
}
116-
resp_body release_buffer() { return std::move(buffer_); }
117136
};
118137

119138
template <>
120-
struct async_rpc_result_value_t<void> {
121-
resp_body buffer_;
122-
std::string_view attachment() const noexcept {
123-
return buffer_.resp_attachment_buf_;
124-
}
125-
resp_body release_buffer() { return std::move(buffer_); }
139+
struct async_rpc_result_value_t<void> : public detail::async_rpc_result_base {
140+
using async_rpc_result_base::async_rpc_result_base;
126141
};
127142

128143
template <typename T>
@@ -317,8 +332,9 @@ class coro_rpc_client {
317332
template <auto func, typename... Args>
318333
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>> call(
319334
Args &&...args) {
320-
return call_for<func>(config_.request_timeout_duration,
321-
std::forward<Args>(args)...);
335+
return call<func>(
336+
request_config_t{{}, req_attachment_, resp_attachment_buffer_},
337+
std::forward<Args>(args)...);
322338
}
323339

324340
/*!
@@ -335,13 +351,22 @@ class coro_rpc_client {
335351
template <auto func, typename... Args>
336352
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>>
337353
call_for(auto request_timeout_duration, Args &&...args) {
354+
return call<func>(
355+
request_config_t{request_timeout_duration, req_attachment_,
356+
resp_attachment_buffer_},
357+
std::forward<Args>(args)...);
358+
}
359+
360+
template <auto func, typename... Args>
361+
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>> call(
362+
request_config_t config, Args &&...args) {
338363
using return_type = decltype(get_return_type<func>());
339-
auto async_result =
340-
co_await co_await send_request_for_with_attachment<func, Args...>(
341-
request_timeout_duration, req_attachment_,
342-
std::forward<Args>(args)...);
364+
auto async_result = co_await co_await send_request<func, Args...>(
365+
std::move(config), std::forward<Args>(args)...);
343366
req_attachment_ = {};
367+
resp_attachment_buffer_ = {};
344368
if (async_result) {
369+
resp_attachment_ = async_result->get_attachment();
345370
control_->resp_buffer_ = async_result->release_buffer();
346371
if constexpr (std::is_same_v<return_type, void>) {
347372
co_return expected<return_type, rpc_error>{};
@@ -377,13 +402,24 @@ class coro_rpc_client {
377402
req_attachment_ = attachment;
378403
return true;
379404
}
405+
void set_resp_attachment_buf(std::span<char> buffer) {
406+
resp_attachment_buffer_ = buffer;
407+
}
380408

381-
std::string_view get_resp_attachment() const {
382-
return control_->resp_buffer_.resp_attachment_buf_;
409+
std::string_view get_resp_attachment() const { return resp_attachment_; }
410+
411+
bool is_resp_attachment_in_external_buf() const {
412+
return resp_attachment_.data() !=
413+
control_->resp_buffer_.resp_attachment_buf_.data();
383414
}
384415

385416
std::string release_resp_attachment() {
386-
return std::move(control_->resp_buffer_.resp_attachment_buf_);
417+
if (is_resp_attachment_in_external_buf()) {
418+
return std::move(control_->resp_buffer_.resp_attachment_buf_);
419+
}
420+
else {
421+
return {};
422+
}
387423
}
388424

389425
template <typename T, typename U>
@@ -713,6 +749,7 @@ class coro_rpc_client {
713749

714750
struct async_rpc_raw_result_value_type {
715751
resp_body buffer_;
752+
std::string_view attachment;
716753
uint8_t errc_;
717754
};
718755

@@ -724,13 +761,21 @@ class coro_rpc_client {
724761
struct handler_t {
725762
std::unique_ptr<coro_io::period_timer> timer_;
726763
async_simple::Promise<async_rpc_raw_result> promise_;
764+
std::span<char> response_attachment_buffer_;
727765
handler_t(std::unique_ptr<coro_io::period_timer> &&timer,
728-
async_simple::Promise<async_rpc_raw_result> &&promise)
729-
: timer_(std::move(timer)), promise_(std::move(promise)) {}
766+
async_simple::Promise<async_rpc_raw_result> &&promise,
767+
std::span<char> buffer = {})
768+
: timer_(std::move(timer)),
769+
promise_(std::move(promise)),
770+
response_attachment_buffer_(buffer) {}
771+
std::span<char> &get_buffer() { return response_attachment_buffer_; }
730772
void operator()(resp_body &&buffer, uint8_t rpc_errc) {
731773
timer_->cancel();
732-
promise_.setValue(async_rpc_raw_result{
733-
async_rpc_raw_result_value_type{std::move(buffer), rpc_errc}});
774+
promise_.setValue(async_rpc_raw_result{async_rpc_raw_result_value_type{
775+
std::move(buffer),
776+
std::string_view{response_attachment_buffer_.data(),
777+
response_attachment_buffer_.size()},
778+
rpc_errc}});
734779
}
735780
void local_error(std::error_code &ec) {
736781
timer_->cancel();
@@ -810,8 +855,8 @@ class coro_rpc_client {
810855
private:
811856
template <auto func, typename... Args>
812857
async_simple::coro::Lazy<rpc_error> send_request_for_impl(
813-
auto request_timeout_duration, uint32_t &id, coro_io::period_timer &timer,
814-
std::string_view attachment, Args &&...args) {
858+
request_config_t &config, uint32_t &id, coro_io::period_timer &timer,
859+
Args &&...args) {
815860
using R = decltype(get_return_type<func>());
816861

817862
if (control_->has_closed_)
@@ -829,21 +874,24 @@ class coro_rpc_client {
829874

830875
static_check<func, Args...>();
831876

832-
if (request_timeout_duration.count() >= 0) {
833-
timeout(timer, request_timeout_duration, "rpc call timer canceled")
877+
if (config.request_timeout_duration->count() >= 0) {
878+
timeout(timer, *config.request_timeout_duration,
879+
"rpc call timer canceled")
834880
.start([](auto &&) {
835881
});
836882
}
837883

838884
#ifdef YLT_ENABLE_SSL
839885
if (!config_.ssl_cert_path.empty()) {
840886
assert(control_->ssl_stream_);
841-
co_return co_await send_impl<func>(*control_->ssl_stream_, id, attachment,
887+
co_return co_await send_impl<func>(*control_->ssl_stream_, id,
888+
config.request_attachment,
842889
std::forward<Args>(args)...);
843890
}
844891
else {
845892
#endif
846-
co_return co_await send_impl<func>(control_->socket_, id, attachment,
893+
co_return co_await send_impl<func>(control_->socket_, id,
894+
config.request_attachment,
847895
std::forward<Args>(args)...);
848896
#ifdef YLT_ENABLE_SSL
849897
}
@@ -873,6 +921,14 @@ class coro_rpc_client {
873921
<< ". close the socket.value=" << ret.first.value();
874922
break;
875923
}
924+
auto iter = controller->response_handler_table_.find(header.seq_num);
925+
if (iter == controller->response_handler_table_.end()) {
926+
ELOG_ERROR << "unexists request ID:" << header.seq_num
927+
<< ". close the socket.";
928+
break;
929+
}
930+
ELOG_TRACE << "find request ID:" << header.seq_num
931+
<< ". start notify response handler";
876932
uint32_t body_len = header.length;
877933
struct_pack::detail::resize(
878934
controller->resp_buffer_.read_buf_,
@@ -889,15 +945,24 @@ class coro_rpc_client {
889945
controller->resp_buffer_.resp_attachment_buf_.clear();
890946
}
891947
else {
892-
struct_pack::detail::resize(
893-
controller->resp_buffer_.resp_attachment_buf_,
894-
header.attach_length);
948+
std::span<char> &attachment_buffer = iter->second.get_buffer();
949+
if (attachment_buffer.size() < header.attach_length) {
950+
// allocate attachment buffer
951+
if (attachment_buffer.size()) [[unlikely]] {
952+
ELOG_TRACE << "user's attachment buffer size is too small, instead "
953+
"by inner allocated buffer";
954+
}
955+
struct_pack::detail::resize(
956+
controller->resp_buffer_.resp_attachment_buf_,
957+
std::max<uint64_t>(header.attach_length, sizeof(std::string)));
958+
attachment_buffer = controller->resp_buffer_.resp_attachment_buf_;
959+
}
960+
attachment_buffer = attachment_buffer.subspan(0, header.attach_length);
895961
std::array<asio::mutable_buffer, 2> iov{
896962
asio::mutable_buffer{controller->resp_buffer_.read_buf_.data(),
897963
body_len},
898-
asio::mutable_buffer{
899-
controller->resp_buffer_.resp_attachment_buf_.data(),
900-
controller->resp_buffer_.resp_attachment_buf_.size()}};
964+
asio::mutable_buffer{attachment_buffer.data(),
965+
attachment_buffer.size()}};
901966
ret = co_await coro_io::async_read(socket, iov);
902967
}
903968
if (ret.first) {
@@ -915,20 +980,10 @@ class coro_rpc_client {
915980
file.close();
916981
#endif
917982
--controller->recving_cnt_;
918-
if (auto iter = controller->response_handler_table_.find(header.seq_num);
919-
iter != controller->response_handler_table_.end()) {
920-
ELOG_TRACE << "find request ID:" << header.seq_num
921-
<< ". start notify response handler";
922-
iter->second(std::move(controller->resp_buffer_), header.err_code);
923-
controller->response_handler_table_.erase(iter);
924-
if (controller->response_handler_table_.empty()) {
925-
co_return;
926-
}
927-
}
928-
else {
929-
ELOG_ERROR << "unexists request ID:" << header.seq_num
930-
<< ". close the socket.";
931-
break;
983+
iter->second(std::move(controller->resp_buffer_), header.err_code);
984+
controller->response_handler_table_.erase(iter);
985+
if (controller->response_handler_table_.empty()) {
986+
co_return;
932987
}
933988
} while (true);
934989
close_socket_async(controller);
@@ -966,12 +1021,12 @@ class coro_rpc_client {
9661021
}
9671022
if (result) {
9681023
if constexpr (std::is_same_v<T, void>) {
969-
co_return async_rpc_result<T>{
970-
async_rpc_result_value_t<T>{std::move(ret.buffer_)}};
1024+
co_return async_rpc_result<T>{async_rpc_result_value_t<T>{
1025+
std::move(ret.buffer_), ret.attachment}};
9711026
}
9721027
else {
9731028
co_return async_rpc_result<T>{async_rpc_result_value_t<T>{
974-
std::move(result.value()), std::move(ret.buffer_)}};
1029+
std::move(result.value()), std::move(ret.buffer_), ret.attachment}};
9751030
}
9761031
}
9771032
else {
@@ -984,27 +1039,25 @@ class coro_rpc_client {
9841039
async_simple::coro::Lazy<async_simple::coro::Lazy<
9851040
async_rpc_result<decltype(get_return_type<func>())>>>
9861041
send_request(Args &&...args) {
987-
return send_request_for_with_attachment<func>(
988-
config_.request_timeout_duration, {}, std::forward<Args>(args)...);
1042+
return send_request<func>(request_config_t{}, std::forward<Args>(args)...);
9891043
}
9901044

9911045
template <auto func, typename... Args>
9921046
async_simple::coro::Lazy<async_simple::coro::Lazy<
9931047
async_rpc_result<decltype(get_return_type<func>())>>>
9941048
send_request_with_attachment(std::string_view request_attachment,
9951049
Args &&...args) {
996-
return send_request_for_with_attachment<func>(
997-
config_.request_timeout_duration, request_attachment,
1050+
return send_request<func>(
1051+
request_config_t{.request_attachment = request_attachment},
9981052
std::forward<Args>(args)...);
9991053
}
10001054

1001-
template <auto func, typename... Args>
1055+
template <auto func, typename Duration, typename... Args>
10021056
async_simple::coro::Lazy<async_simple::coro::Lazy<
10031057
async_rpc_result<decltype(get_return_type<func>())>>>
1004-
send_request_for(Args &&...args) {
1005-
return send_request_for_with_attachment<func>(
1006-
config_.request_timeout_duration, std::string_view{},
1007-
std::forward<Args>(args)...);
1058+
send_request_for(Duration request_timeout_duration, Args &&...args) {
1059+
return send_request<func>(request_config_t{request_timeout_duration},
1060+
std::string_view{}, std::forward<Args>(args)...);
10081061
}
10091062

10101063
struct recving_guard {
@@ -1029,25 +1082,26 @@ class coro_rpc_client {
10291082
template <auto func, typename... Args>
10301083
async_simple::coro::Lazy<async_simple::coro::Lazy<
10311084
async_rpc_result<decltype(get_return_type<func>())>>>
1032-
send_request_for_with_attachment(auto request_timeout_duration,
1033-
std::string_view request_attachment,
1034-
Args &&...args) {
1085+
send_request(request_config_t config, Args &&...args) {
10351086
using rpc_return_t = decltype(get_return_type<func>());
10361087
recving_guard guard(control_.get());
10371088
uint32_t id;
1089+
if (!config.request_timeout_duration) {
1090+
config.request_timeout_duration = config_.request_timeout_duration;
1091+
}
1092+
assert(config.request_timeout_duration.has_value());
10381093

10391094
auto timer = std::make_unique<coro_io::period_timer>(
10401095
control_->executor_.get_asio_executor());
10411096
auto result = co_await send_request_for_impl<func>(
1042-
request_timeout_duration, id, *timer, request_attachment,
1043-
std::forward<Args>(args)...);
1097+
config, id, *timer, std::forward<Args>(args)...);
10441098
auto &control = *control_;
10451099
if (!result) {
10461100
async_simple::Promise<async_rpc_raw_result> promise;
10471101
auto future = promise.getFuture();
10481102
bool is_empty = control.response_handler_table_.empty();
10491103
auto &&[_, is_ok] = control.response_handler_table_.try_emplace(
1050-
id, std::move(timer), std::move(promise));
1104+
id, std::move(timer), std::move(promise), config.resp_attachment_buf);
10511105
if (!is_ok) [[unlikely]] {
10521106
close();
10531107
co_return build_failed_rpc_result<rpc_return_t>(
@@ -1196,8 +1250,10 @@ class coro_rpc_client {
11961250
std::atomic<uint32_t> request_id_{0};
11971251
std::unique_ptr<coro_io::period_timer> timer_;
11981252
std::shared_ptr<control_t> control_;
1199-
std::string_view req_attachment_;
12001253
std::vector<asio::ip::tcp::endpoint> endpoints_;
1254+
std::string_view req_attachment_;
1255+
std::span<char> resp_attachment_buffer_;
1256+
std::string_view resp_attachment_;
12011257
config config_;
12021258
constexpr static std::size_t default_read_buf_size_ = 256;
12031259
#ifdef YLT_ENABLE_SSL

src/coro_rpc/examples/base_examples/client.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ Lazy<void> show_rpc_call() {
5151
auto ret_void = co_await client.call<echo_with_attachment>();
5252
assert(client.get_resp_attachment() == "This is attachment.");
5353

54+
client.set_req_attachment("This is attachment.");
55+
char buf[100];
56+
client.set_resp_attachment_buf(buf);
57+
ret_void = co_await client.call<echo_with_attachment>();
58+
assert(client.get_resp_attachment() == "This is attachment.");
59+
assert(client.is_resp_attachment_in_external_buf());
60+
assert(client.get_resp_attachment().data() == buf);
61+
5462
ret = co_await client.call<nested_echo>("nested_echo"s);
5563
assert(ret.value() == "nested_echo"s);
5664

0 commit comments

Comments
 (0)