forked from googleapis/google-cloud-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompletion_queue.h
316 lines (288 loc) · 12.2 KB
/
completion_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
#include "google/cloud/future.h"
#include "google/cloud/internal/async_read_stream_impl.h"
#include "google/cloud/internal/async_rpc_details.h"
#include "google/cloud/internal/completion_queue_impl.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
#include <chrono>
#include <type_traits>
namespace google {
namespace cloud {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class CompletionQueue;
namespace internal {
std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
CompletionQueue const& cq);
std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
CompletionQueue&& cq);
template <typename Request, typename Response>
future<StatusOr<Response>> MakeUnaryRpcImpl(
CompletionQueue& cq, GrpcAsyncCall<Request, Response> async_call,
Request const& request, std::shared_ptr<grpc::ClientContext> context);
} // namespace internal
/**
* Call the functor associated with asynchronous operations when they complete.
*/
class CompletionQueue {
public:
CompletionQueue();
explicit CompletionQueue(std::shared_ptr<internal::CompletionQueueImpl> impl)
: impl_(std::move(impl)) {}
/**
* Run the completion queue event loop.
*
* Note that more than one thread can call this member function, to create a
* pool of threads completing asynchronous operations.
*/
void Run() { impl_->Run(); }
/// Terminate the completion queue event loop.
void Shutdown() { impl_->Shutdown(); }
/// Cancel all pending operations.
void CancelAll() { impl_->CancelAll(); }
/**
* Create a timer that fires at @p deadline.
*
* @param deadline when should the timer expire.
*
* @return a future that becomes satisfied after @p deadline.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
MakeDeadlineTimer(std::chrono::system_clock::time_point deadline) {
return impl_->MakeDeadlineTimer(deadline);
}
/**
* Create a timer that fires after the @p duration.
*
* @tparam Rep a placeholder to match the Rep tparam for @p duration type,
* the semantics of this template parameter are documented in
* `std::chrono::duration<>` (in brief, the underlying arithmetic type
* used to store the number of ticks), for our purposes it is simply a
* formal parameter.
* @tparam Period a placeholder to match the Period tparam for @p duration
* type, the semantics of this template parameter are documented in
* `std::chrono::duration<>` (in brief, the length of the tick in seconds,
* expressed as a `std::ratio<>`), for our purposes it is simply a formal
* parameter.
*
* @param duration when should the timer expire relative to the current time.
*
* @return a future that becomes satisfied after @p duration time has elapsed.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
template <typename Rep, typename Period>
future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(
std::chrono::duration<Rep, Period> duration) {
return impl_->MakeRelativeTimer(
std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
}
/**
* Make an asynchronous unary RPC.
*
* @deprecated Applications should have no need to call this function. The
* libraries provide `Async*()` member functions in the generated (or)
* hand-crafted `*Client` classes for the same purpose.
*
* @param async_call a callable to start the asynchronous RPC.
* @param request the contents of the request.
* @param context an initialized request context to make the call.
*
* @tparam AsyncCallType the type of @a async_call. It must be invocable with
* `(grpc::ClientContext*, Request const&, grpc::CompletionQueue*)`.
* Furthermore, it should return a
* `std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<Response>>>`.
* These requirements are verified by
* `internal::CheckAsyncUnaryRpcSignature<>`, and this function is
* excluded from overload resolution if the parameters do not meet these
* requirements.
* @tparam Request the type of the request parameter in the gRPC.
* @tparam Response the response from the asynchronous RPC.
* @tparam Sig a helper type to compute `Response`.
*
* @return a future that becomes satisfied when the operation completes.
*/
template <
typename AsyncCallType, typename Request,
/// @cond implementation_details
typename Sig = internal::AsyncCallResponseType<AsyncCallType, Request>,
/// @endcond
typename Response = typename Sig::type,
/// @cond implementation_details
std::enable_if_t<Sig::value, int> = 0
/// @endcond
>
future<StatusOr<Response>> MakeUnaryRpc(
AsyncCallType async_call, Request const& request,
std::unique_ptr<grpc::ClientContext> context) {
return internal::MakeUnaryRpcImpl<Request, Response>(
*this, std::move(async_call), request, std::move(context));
}
/**
* Make an asynchronous streaming read RPC.
*
* Reading from the stream starts automatically, and the handler is notified
* of all interesting events in the stream. Note that then handler is called
* by any thread blocked on this object's Run() member function. However, only
* one callback in the handler is called at a time.
*
* @deprecated Applications should have no need to call this function. The
* libraries provide `Async*()` member functions in the generated (or)
* hand-crafted `*Client` classes for the same purpose.
*
* @param async_call a callable to start the asynchronous RPC.
* @param request the contents of the request.
* @param context an initialized request context to make the call.
* @param on_read the callback to be invoked on each successful Read().
* @param on_finish the callback to be invoked when the stream is closed.
*
* @tparam AsyncCallType the type of @a async_call. It must be invocable with
* parameters
* `(grpc::ClientContext*, RequestType const&, grpc::CompletionQueue*)`.
* Furthermore, it should return a type convertible to
* `std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>>>`.
* These requirements are verified by
* `internal::AsyncStreamingReadRpcUnwrap<>`, and this function is
* excluded from overload resolution if the parameters do not meet these
* requirements.
* @tparam Request the type of the request in the streaming RPC.
* @tparam Response the type of the response in the streaming RPC.
* @tparam OnReadHandler the type of the @p on_read callback.
* @tparam OnFinishHandler the type of the @p on_finish callback.
*/
template <typename AsyncCallType, typename Request,
typename Response = typename internal::
AsyncStreamingReadResponseType<AsyncCallType, Request>::type,
typename OnReadHandler, typename OnFinishHandler>
std::shared_ptr<AsyncOperation> MakeStreamingReadRpc(
AsyncCallType&& async_call, Request const& request,
std::unique_ptr<grpc::ClientContext> context, OnReadHandler&& on_read,
OnFinishHandler&& on_finish) {
auto stream = internal::MakeAsyncReadStreamImpl<Response>(
std::forward<OnReadHandler>(on_read),
std::forward<OnFinishHandler>(on_finish));
stream->Start(std::forward<AsyncCallType>(async_call), request,
std::move(context), impl_);
return stream;
}
/**
* Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
*
* @param functor the functor to invoke in one of the CompletionQueue's
* threads.
*
* @tparam Functor the type of @p functor. It must satisfy
* `std::is_invocable<Functor, #CompletionQueue&>`
*/
template <
typename Functor,
/// @cond implementation_details
std::enable_if_t<internal::CheckRunAsyncCallback<Functor>::value, int> = 0
/// @endcond
>
void RunAsync(Functor&& functor) {
class Wrapper : public internal::RunAsyncBase {
public:
Wrapper(std::weak_ptr<internal::CompletionQueueImpl> impl, Functor&& f)
: impl_(std::move(impl)), fun_(std::forward<Functor>(f)) {}
~Wrapper() override = default;
void exec() override {
auto impl = impl_.lock();
if (!impl) return;
CompletionQueue cq(std::move(impl));
fun_(cq);
}
private:
std::weak_ptr<internal::CompletionQueueImpl> impl_;
std::decay_t<Functor> fun_;
};
impl_->RunAsync(
std::make_unique<Wrapper>(impl_, std::forward<Functor>(functor)));
}
/**
* Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
*
* @param functor the functor to call in one of the CompletionQueue's threads.
* @tparam Functor the type of @p functor. It must satisfy
* `std::is_invocable<Functor>`.
*/
template <typename Functor,
/// @cond implementation_details
std::enable_if_t<internal::is_invocable<Functor>::value, int> = 0
/// @endcond
>
void RunAsync(Functor&& functor) {
class Wrapper : public internal::RunAsyncBase {
public:
explicit Wrapper(Functor&& f) : fun_(std::forward<Functor>(f)) {}
~Wrapper() override = default;
void exec() override { fun_(); }
private:
std::decay_t<Functor> fun_;
};
impl_->RunAsync(std::make_unique<Wrapper>(std::forward<Functor>(functor)));
}
/**
* Asynchronously wait for a connection to become ready.
*
* @param channel the channel on which to wait for state changes
* @param deadline give up waiting for the state change if this deadline
* passes
* @return `future<>` which will be satisfied when either of these events
* happen: (a) the connection is ready, (b) the connection permanently
* failed, (c) deadline passes before (a) or (b) happen; the future will
* be satisfied with `StatusCode::kOk` for (a), `StatusCode::kCancelled`
* for (b) and `StatusCode::kDeadlineExceeded` for (c)
*/
future<Status> AsyncWaitConnectionReady(
std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline);
private:
friend std::shared_ptr<internal::CompletionQueueImpl>
internal::GetCompletionQueueImpl(CompletionQueue const& cq);
friend std::shared_ptr<internal::CompletionQueueImpl>
internal::GetCompletionQueueImpl(CompletionQueue&& cq);
std::shared_ptr<internal::CompletionQueueImpl> impl_;
};
namespace internal {
inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
CompletionQueue const& cq) {
return cq.impl_;
}
inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
CompletionQueue&& cq) {
return std::move(cq.impl_);
}
template <typename Request, typename Response>
future<StatusOr<Response>> MakeUnaryRpcImpl(
CompletionQueue& cq, GrpcAsyncCall<Request, Response> async_call,
Request const& request, std::shared_ptr<grpc::ClientContext> context) {
auto op =
std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
auto impl = GetCompletionQueueImpl(cq);
impl->StartOperation(op, [&, c = std::move(context)](void* tag) {
op->Start(async_call, std::move(c), request, impl->cq(), tag);
});
return op->GetFuture();
}
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace cloud
} // namespace google
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H