-
Notifications
You must be signed in to change notification settings - Fork 5
/
notes
328 lines (300 loc) · 15.9 KB
/
notes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
Miscellaneous notes about a Homa plugin for gRPC:
-------------------------------------------------
* Notes about java implementation:
* Support long messages.
* Need to keep track of many outstanding requests
* Eliminate duplicate code between
* Handle EAGAIN in HomaIncoming.read?
* Change all invocations of System.out.printf to log instead.
* Find out what scheduledExecutorService is.
* Check for other client Listener callbacks that need to be made.
* Invoke stream tracers.
* Log error in RpcSpec.getInetSocketAddress (find all System.out refs)
* Add count of entries to metadata wire format?
* Need to eventually invoke onReady to send client messages?
* TODO list:
* Add proper channelz support
* Deal with recv_initial_metadata.recv_flags
* Make HomaStream instance variables protected
* Various notes about how gRPC works:
* A "stream" is the embodiment of a particular RPC.
* "Plucking" a completion queue means extracting a particular desired
event, rather than any available event.
* GRPC_ARG_MAX_METADATA_SIZE: channel argument type, defined in grpc_types.h
* Questions for gRPC experts:
* Client side:
* How does the target host address find its way to the transport?
see grpc_cronet_secure_channel_create: target argument
Target passed to CreateChannelImpl!
* How to hook up pollsets to receive results?
* Role of arguments for channels?
* Key-value pairs (latency characteristics?)
* Channels with different arguments cannot share a single subchannel
* None may matter for me?
* What is metadata? Name-value pairs, like HTTP headers
* Which things do I have to serialize? Metadata isn't serialized, and
message body may also be in multiple chunks.
* Synchronization
* Can a stream have multiple active calls? Stream == RPC, not connection,
but can still have multiple calls.
* Server side:
* Do I need to create a channel for each incoming stream?
No: see set_accept_stream in the grpc_transport_op structure
passed to the perform_op transport callback.
* No channels on server
* What is the rendezvous mechanism between incoming request and
outgoing response?
* Access from languages other than C++? Do I have to create a library
for each language?)?
* Wrapped languages use the C API, and the C API is intended *only*
for wrapped languages. There is no official C interface.
* Surface API: *core* grpc.h
* Coding conventions: they recommend following the Google C++ style guide;
all gRPC C++ files attempt to follow this.
* What are the rules for storage allocation? Arenas?
C++ new is now fine.
* OK to use STL instead of gpr stuff (e.g. mutex)
STL is fine except for synchronization, where they prefer
grpc::internal::Mutex in C++ and grpc_core::Mutex in C.
There is a lock_guard equivalent (mutex_lock?)
lockguard -- grpc_core::MutexLock
unique_lock - grpc_core::ReleaseableMutexLock
* What is the role of combiners?
Collect a bunch of ops, do them all on a single thread, possibly merge writes
Not essential, just for performance
* Secure connections: how do these interact with transports?
* Just hook into the right places: grpc_secure_channel_create
* Compression: any impact on transports?
* 2 forms of compression: message compression and stream compression.
Message compression happens above the level of the transport; stream
compression happens in the transport (e.g., compress several messages
together) but it's unclear how much benefit this provides
* I can ignore compression for now
* See link in bookmarks for more information.
* Load balancing
* Forgot to ask about this
* What load is being balanced (server-side?)
* How does it interact with transports
* Channelz support
Other notes from phone call on 12/8:
* There are two layers of channel: top channels and subchannel.
* A single subchannel can be shared among multiple top channels
* See end-to-end tests, such as grpc/test/core/end2end/tests/simple_request.cc
* YouTube video on gRPC internal structure:
https://www.youtube.com/watch?v=S7WIYLcPS1Y&feature=youtu.be
* The authority metadata should be target host name (no port?),
not "homa.authority"
* Add plugin to gRPC: grpc_register_plugin
* ExecCtx: used to collect a bunch of work to do, which can then be done
later at a higher stack level in order to avoid locking issues if it's
done right away.
* Endpoints: looks like they keep track of socket-like objects (perhaps
a portability layer)?
* Basic client request:
* client_unary_call.h: BlockingUnaryCallImpl
* creates a Call object
* invokes PerformOps on that Call
* call.cc: call_start_batch
* call.cc: execute_batch
* call_combiner.cc: CallCombiner::Start schedules a closure
* closure callback to call_combiner.cc: execute_batch_in_call_combiner
* client_channel.cc: CallData::StartTransportStreamOpBatch
* calls CompletionQueue::pluck to wait for results
* Levels in creating a client-side channel:
* grpc::CreateChannel [create_channel.cc:33]
* grpc::CreateCustomChannel [create_channel.cc:50]
* InsecureChannelCredentialsImpl::CreateChannelImpl[insecure_credentials.cc:37]
* InsecureChannelCredentialsImpl::CreateChannelWithInterceptors
[insecure_credentials.cc:50]
* grpc_insecure_channel_create [channel_create.cc:105]
* grpc_core::CreateChannel [channel_create.cc:66]
* grpc_channel_create [channel.cc:271]
* grpc_channel_create_with_builder [channel.cc:61]
* Levels in creating client-side socket:
* BlockingUnaryCall [client_unary_call.h:40]
* BlockingUnaryCallImpl [client_unary_call.h:69]
* CoreCodegen::grpc_completion_queue_pluck [core_codegen.cc:74]
* grpc_completion_queue_pluck [completion_queue.cc:1324]
* cq_pluck [completion_queue.cc:1297]
* grpc_pollset_work [pollset.cc:48]
* pollset_work [ev_posix.cc:323]
* pollset_work [ev_epollex_linux.cc:1137]
* ExecCtx::Flush [exec_ctx.cc:153]
* exec_ctx_run [exec_ctx.cc:40]
* AresDnsResolver::OnResolved [dns_resolver_ares.cc:327]
* grpc_core::WorkSerializer::Run [work_serializer.cc:152]
* grpc_core::WorkSerializer::WorkSerializerImpl::Run
[work_serializer.cc:67]
* std::function ...
* AresDnsResolver::<lambda()>::operator() [dns_resolver_ares.cc:327]
* AresDnsResolver::OnResolvedLocked [dns_resolver_ares.cc:365]
* ResolvingLoadBalancingPolicy::ResolverResultHandler::ReturnResult
[resolving_lb_policy.cc:89]
* ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked
[resolving_lb_policy.cc:337]
* ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked
[resolving_lb_policy.cc:251]
* ChildPolicyHandler::UpdateLocked [child_policy_handler.cc:238]
* PickFirst::UpdateLocked [pick_first.cc:281]
* PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked
[pick_first.cc:237]
* ChannelData::SubchannelWrapper::AttemptToConnect [client_channel.cc:979]
* Subchannel::AttemptToConnect [subchannel.cc:894]
* Subchannel::MaybeStartConnectingLocked [subchannel.cc:1003]
* Subchannel::ContinueConnectingLocked [subchannel.cc:1056]
* Chttp2Connector::Connect [chttp2_connector.cc:74]
* grpc_tcp_client_connect [tcp_client.cc:30]
* tcp_connect [tcp_client_posix.cc:341]
* grpc_tcp_client_prepare_fd [tcp_client_posix.cc:258]
* grpc_create_dualstack_socket [socket_utils_common_posix.cc:452]
* grpc_create_dualstack_socket_using_factory [socket_utils_common_posix.cc:470]
* Levels in first client-side call to write:
* BlockingUnaryCall [client_unary_call.h:40]
* BlockingUnaryCallImpl [client_unary_call.h:69]
* CoreCodegen::grpc_completion_queue_pluck [core_codegen.cc:74]
* grpc_completion_queue_pluck [completion_queue.cc:1324]
* cq_pluck [completion_queue.cc:1297]
* grpc_pollset_work [pollset.cc:48]
* pollset_work [ev_posix.cc:323]
* pollset_work [ev_epollex_linux.cc:1137]
* ExecCtx::Flush [exec_ctx.cc:156]
* grpc_combiner_continue_exec_ctx [combiner.cc:252]
* write_action_begin_locked [chttp2_transport.cc:952]
* write_action [chttp2_transport.cc:978]
* grpc_endpoint_write [endpoint.cc:32]
* tcp_write [tcp_posix.cc:1559]
* Levels to invoke client perform_stream_op:
*
* Levels in creating server (top down; doesn't actually open socket):
* ServerBuilder::BuildAndStart [server_builder.cc:399]
* Server::AddListeningPort [server_cc.cc:1092]
* creds->AddPortToServer(grpc_server*) [insecure_server_credentials:29]
* grpc_server_add_insecure_http2_port [server_chttp2.cc:35]
* grpc_core::Chttp2ServerAddPort(Server*) [chttp2_server.cc:478]
* grpc_core::Chttp2ServerListener::Create [chttp2_server.cc:285]
* grpc_tcp_server_create(Server::ListenerInterface) simply dispatches
* tcp_server_create() [tcp_server_posix.cc:67]
* Levels to open socket on server (top down):
* Server::AddListeningPort [server_cc.cc:1092]
* creds->AddPortToServer(grpc_server*) [insecure_server_credentials:29]
* grpc_server_add_insecure_http2_port [server_chttp2.cc:37]
* grpc_core::Chttp2ServerAddPort [chttp2_server.cc:478]
(just delegates)
* Chttp2ServerListener::Create [chttp2_server.cc:290]
* grpc_tcp_server_add_port [tcp_server.cc:40]
(just delegates)
* tcp_server_add_port [tcp_server_posix.cc:437]
(parses address)
* add_wildcard_addrs_to_server [tcp_server_posix.cc:306]
* grpc_tcp_server_add_addr [tcp_server_utils_posix_common.cc:134]
* grpc_create_dualstack_socket [socket_utils_common_posix.cc:452]
* create_socket [socket_utils_common_posix.cc:460]
* Levels to register for callbacks (top down):
* ServerBuilder::BuildAndStart [server_builder.cc:411]
* Server::Start [server_cc.cc:1168]
* grpc_server_start (pure dispatch) [server.cc:1489]
* grpc_core::Server::Start [server.cc:574]
* Server::ListenerInterface::Start [chttp2_server.cc:387]
(takes std::vector<grpc_pollset*>*, just delegates)
* grpc_tcp_server_start (dispatches through vtable)
* tcp_server_start [tcp_server_posix.cc:519]
* Issues to resolve
* Poll sets
* Deadlines
* Notes on building Java channels for TCP:
* NettyChannelBuilder constructed by NettyChannelBuilder.forTarget
(called by ManagedChannelBuilder.forAddress)
* NettyChannelBuilder.buildTransportFactory creates a NettyTransportFactory
(subclass of ClientTransportFactory)
(invoked from ManagedChannelBuilder.build)
* NettyTransportFactory.newClientTransport creates a NettyClientTransport
(subclass of ConnectionClientTransport)
(invoked in separate thread...)
* Very thin class NettyChannelTransportFactory has one method,
buildClientTransportFactory, which invokes
NettyChannelBuilder.buildTransportFactory
which creates a NettyTransportFactory
* Code for setting up HTTP channels:
insecure_credentials.cc:
------------------------
class InsecureChannelCredentialsImpl final : public ChannelCredentials {
public:
std::shared_ptr<Channel> CreateChannelImpl(
const std::string& target, const ChannelArguments& args) override {
return CreateChannelWithInterceptors(
target, args,
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
}
std::shared_ptr<Channel> CreateChannelWithInterceptors(
const std::string& target, const ChannelArguments& args,
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators) override {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
std::shared_ptr<Channel> channel = grpc::CreateChannelInternal(
"", grpc_channel_create(target.c_str(), creds, &channel_args),
std::move(interceptor_creators));
grpc_channel_credentials_release(creds);
return channel;
}
SecureChannelCredentials* AsSecureCredentials() override { return nullptr; }
private:
bool IsInsecure() const override { return true; }
};
chttp2_connector.cc:
--------------------
grpc_channel* grpc_channel_create(const char* target,
grpc_channel_credentials* creds,
const grpc_channel_args* c_args) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3,
(target, (void*)creds, (void*)c_args));
grpc_channel* channel = nullptr;
grpc_error_handle error;
if (creds != nullptr) {
// Add channel args containing the client channel factory and channel
// credentials.
gpr_once_init(&g_factory_once, FactoryInit);
grpc_core::ChannelArgs args =
creds->update_arguments(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(c_args)
.SetObject(creds->Ref())
.SetObject(g_factory));
// Create channel.
auto r = grpc_core::CreateChannel(target, args);
if (r.ok()) {
channel = r->release()->c_ptr();
} else {
error = absl_status_to_grpc_error(r.status());
}
}
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
&integer)) {
status = static_cast<grpc_status_code>(integer);
}
channel = grpc_lame_client_channel_create(
target, status, "Failed to create secure client channel");
}
return channel;
}
absl::StatusOr<RefCountedPtr<Channel>> CreateChannel(const char* target,
const ChannelArgs& args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
return absl::InvalidArgumentError("channel target is NULL");
}
// Add channel arg containing the server URI.
std::string canonical_target =
CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
target);
return Channel::Create(target,
args.Set(GRPC_ARG_SERVER_URI, canonical_target),
GRPC_CLIENT_CHANNEL, nullptr);
}