@@ -102,13 +102,31 @@ int NebdClient::Init(const char* confpath) {
102
102
103
103
heartbeatMgr_->Run ();
104
104
105
+ // init rpc send exec-queue
106
+ rpcTaskQueues_.resize (option_.requestOption .rpcSendExecQueueNum );
107
+ for (auto & q : rpcTaskQueues_) {
108
+ int rc = bthread::execution_queue_start (
109
+ &q, nullptr , &NebdClient::ExecAsyncRpcTask, this );
110
+ if (rc != 0 ) {
111
+ LOG (ERROR) << " Init AsyncRpcQueues failed" ;
112
+ return -1 ;
113
+ }
114
+ }
115
+
105
116
return 0 ;
106
117
}
107
118
108
119
void NebdClient::Uninit () {
109
120
if (heartbeatMgr_ != nullptr ) {
110
121
heartbeatMgr_->Stop ();
111
122
}
123
+
124
+ // stop exec queue
125
+ for (auto & q : rpcTaskQueues_) {
126
+ bthread::execution_queue_stop (q);
127
+ bthread::execution_queue_join (q);
128
+ }
129
+
112
130
LOG (INFO) << " NebdClient uninit success." ;
113
131
google::ShutdownGoogleLogging ();
114
132
}
@@ -289,67 +307,85 @@ int64_t NebdClient::GetFileSize(int fd) {
289
307
}
290
308
291
309
int NebdClient::Discard (int fd, NebdClientAioContext* aioctx) {
292
- nebd::client::NebdFileService_Stub stub (&channel_);
293
- nebd::client::DiscardRequest request;
294
- request.set_fd (fd);
295
- request.set_offset (aioctx->offset );
296
- request.set_size (aioctx->length );
297
-
298
- AioDiscardClosure* done = new (std::nothrow) AioDiscardClosure (
299
- fd, aioctx, option_.requestOption );
300
- done->cntl .set_timeout_ms (-1 );
301
- done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
302
- stub.Discard (&done->cntl , &request, &done->response , done);
310
+ auto task = [this , fd, aioctx]() {
311
+ nebd::client::NebdFileService_Stub stub (&channel_);
312
+ nebd::client::DiscardRequest request;
313
+ request.set_fd (fd);
314
+ request.set_offset (aioctx->offset );
315
+ request.set_size (aioctx->length );
316
+
317
+ AioDiscardClosure* done = new (std::nothrow) AioDiscardClosure (
318
+ fd, aioctx, option_.requestOption );
319
+ done->cntl .set_timeout_ms (-1 );
320
+ done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
321
+ stub.Discard (&done->cntl , &request, &done->response , done);
322
+ };
323
+
324
+ PushAsyncTask (task);
303
325
304
326
return 0 ;
305
327
}
306
328
307
329
int NebdClient::AioRead (int fd, NebdClientAioContext* aioctx) {
308
- nebd::client::NebdFileService_Stub stub (&channel_);
309
- nebd::client::ReadRequest request;
310
- request.set_fd (fd);
311
- request.set_offset (aioctx->offset );
312
- request.set_size (aioctx->length );
313
-
314
- AioReadClosure* done = new (std::nothrow) AioReadClosure (
315
- fd, aioctx, option_.requestOption );
316
- done->cntl .set_timeout_ms (-1 );
317
- done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
318
- stub.Read (&done->cntl , &request, &done->response , done);
330
+ auto task = [this , fd, aioctx]() {
331
+ nebd::client::NebdFileService_Stub stub (&channel_);
332
+ nebd::client::ReadRequest request;
333
+ request.set_fd (fd);
334
+ request.set_offset (aioctx->offset );
335
+ request.set_size (aioctx->length );
336
+
337
+ AioReadClosure* done = new (std::nothrow) AioReadClosure (
338
+ fd, aioctx, option_.requestOption );
339
+ done->cntl .set_timeout_ms (-1 );
340
+ done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
341
+ stub.Read (&done->cntl , &request, &done->response , done);
342
+ };
343
+
344
+ PushAsyncTask (task);
345
+
319
346
return 0 ;
320
347
}
321
348
322
349
static void EmptyDeleter (void * m) {}
323
350
324
351
int NebdClient::AioWrite (int fd, NebdClientAioContext* aioctx) {
325
- nebd::client::NebdFileService_Stub stub (&channel_);
326
- nebd::client::WriteRequest request;
327
- request.set_fd (fd);
328
- request.set_offset (aioctx->offset );
329
- request.set_size (aioctx->length );
352
+ auto task = [this , fd, aioctx]() {
353
+ nebd::client::NebdFileService_Stub stub (&channel_);
354
+ nebd::client::WriteRequest request;
355
+ request.set_fd (fd);
356
+ request.set_offset (aioctx->offset );
357
+ request.set_size (aioctx->length );
358
+
359
+ AioWriteClosure* done = new (std::nothrow) AioWriteClosure (
360
+ fd, aioctx, option_.requestOption );
330
361
331
- AioWriteClosure* done = new (std::nothrow) AioWriteClosure (
332
- fd, aioctx, option_.requestOption );
362
+ done->cntl .set_timeout_ms (-1 );
363
+ done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
364
+ done->cntl .request_attachment ().append_user_data (
365
+ aioctx->buf , aioctx->length , EmptyDeleter);
366
+ stub.Write (&done->cntl , &request, &done->response , done);
367
+ };
333
368
334
- done->cntl .set_timeout_ms (-1 );
335
- done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
336
- done->cntl .request_attachment ().append_user_data (
337
- aioctx->buf , aioctx->length , EmptyDeleter);
338
- stub.Write (&done->cntl , &request, &done->response , done);
369
+ PushAsyncTask (task);
339
370
340
371
return 0 ;
341
372
}
342
373
343
374
int NebdClient::Flush (int fd, NebdClientAioContext* aioctx) {
344
- nebd::client::NebdFileService_Stub stub (&channel_);
345
- nebd::client::FlushRequest request;
346
- request.set_fd (fd);
347
-
348
- AioFlushClosure* done = new (std::nothrow) AioFlushClosure (
349
- fd, aioctx, option_.requestOption );
350
- done->cntl .set_timeout_ms (-1 );
351
- done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
352
- stub.Flush (&done->cntl , &request, &done->response , done);
375
+ auto task = [this , fd, aioctx]() {
376
+ nebd::client::NebdFileService_Stub stub (&channel_);
377
+ nebd::client::FlushRequest request;
378
+ request.set_fd (fd);
379
+
380
+ AioFlushClosure* done = new (std::nothrow) AioFlushClosure (
381
+ fd, aioctx, option_.requestOption );
382
+ done->cntl .set_timeout_ms (-1 );
383
+ done->cntl .set_log_id (logId_.fetch_add (1 , std::memory_order_relaxed));
384
+ stub.Flush (&done->cntl , &request, &done->response , done);
385
+ };
386
+
387
+ PushAsyncTask (task);
388
+
353
389
return 0 ;
354
390
}
355
391
@@ -473,6 +509,13 @@ int NebdClient::InitNebdClientOption(Configuration* conf) {
473
509
LOG_IF (ERROR, ret != true ) << " Load request.rpcMaxDelayHealthCheckIntervalMs failed" ; // NOLINT
474
510
RETURN_IF_FALSE (ret);
475
511
512
+ ret = conf->GetUInt32Value (" request.rpcSendExecQueueNum" ,
513
+ &requestOption.rpcSendExecQueueNum );
514
+ LOG_IF (ERROR, ret != true )
515
+ << " Load request.rpcSendExecQueueNum from config file failed, current "
516
+ " value is "
517
+ << requestOption.rpcSendExecQueueNum ;
518
+
476
519
option_.requestOption = requestOption;
477
520
478
521
ret = conf->GetStringValue (" log.path" , &option_.logOption .logPath );
@@ -564,5 +607,19 @@ void NebdClient::InitLogger(const LogOption& logOption) {
564
607
google::InitGoogleLogging (kProcessName );
565
608
}
566
609
610
+ int NebdClient::ExecAsyncRpcTask (void * meta,
611
+ bthread::TaskIterator<AsyncRpcTask>& iter) { // NOLINT
612
+ if (iter.is_queue_stopped ()) {
613
+ return 0 ;
614
+ }
615
+
616
+ for (; iter; ++iter) {
617
+ auto & task = *iter;
618
+ task ();
619
+ }
620
+
621
+ return 0 ;
622
+ }
623
+
567
624
} // namespace client
568
625
} // namespace nebd
0 commit comments