Skip to content

Commit 9fb11cd

Browse files
committed
Fix warnings
1 parent 4530d71 commit 9fb11cd

File tree

8 files changed

+102
-83
lines changed

8 files changed

+102
-83
lines changed

client/grpc/src/lib/mod.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1-
use crate::lib::errors::status_to_protocol_err;
1+
#![allow(irrefutable_let_patterns)]
2+
23
use crate::lib::rpc::db_rpc_client::DbRpcClient;
3-
use client::ope_btree::errors::BTreeClientError;
4-
use client::ope_btree::errors::BTreeClientError::ProtocolErr;
5-
use client::ope_db::OpeDatabaseClient;
64
use common::misc::ToBytes;
75
use common::misc::ToVecBytes;
86
use futures::FutureExt;
9-
use futures::{StreamExt, TryStreamExt};
7+
use futures::TryStreamExt;
108
use prost::bytes::Bytes;
119
use protocol::btree::{PutCallback, SearchCallback};
1210
use protocol::database::OpeDatabaseRpc;
1311
use protocol::{ProtocolError, RpcFuture};
14-
use std::fmt::Debug;
15-
use tokio::sync::mpsc::Sender;
1612
use tokio_stream::wrappers::ReceiverStream;
17-
use tonic::{Request, Response, Status, Streaming};
13+
use tonic::Request;
1814

1915
pub mod errors;
2016
pub mod rpc;

client/grpc/src/main.rs

+43-4
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,16 @@ use env_logger::Env;
1111
use lib::rpc::db_rpc_client::DbRpcClient;
1212
use std::error::Error;
1313
use std::sync::atomic::AtomicUsize;
14-
use tokio_stream::wrappers::ReceiverStream;
15-
use tonic::transport::Server;
16-
use tonic::Request;
1714

1815
mod lib;
1916

17+
// todo move it to separate integration test
2018
#[tokio::main]
2119
async fn main() -> Result<(), Box<dyn Error + 'static>> {
2220
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
2321

2422
let addr = "http://[::1]:7777";
25-
let mut client = DbRpcClient::connect(addr).await?;
23+
let client = DbRpcClient::connect(addr).await?;
2624
let rpc = lib::GrpcDbRpc::new(client);
2725

2826
log::info!("Connected to {}", addr);
@@ -69,5 +67,46 @@ async fn main() -> Result<(), Box<dyn Error + 'static>> {
6967
log::info!("Response: {:?}", res);
7068
assert_eq!(Some(v2), res);
7169

70+
let n = 20;
71+
for idx in 3..n {
72+
log::info!("Send PUT k{:?} v{:?}", idx, idx);
73+
let k = format!("k{:?}", idx);
74+
let v = format!("k{:?}", idx);
75+
let res = db_client.put(k, v.clone()).await?;
76+
log::info!("Response: {:?}", res);
77+
assert_eq!(None, res);
78+
}
79+
80+
for idx in 3..n {
81+
log::info!("Send GET k{:?} v{:?}", idx, idx);
82+
let k = format!("k{:?}", idx);
83+
let v = format!("k{:?}", idx);
84+
let res = db_client.get(k).await?;
85+
log::info!("Response: {:?}", res);
86+
assert_eq!(Some(v), res);
87+
}
88+
89+
// override values
90+
91+
let n = 10;
92+
for idx in 3..n {
93+
log::info!("Send PUT k{:?} v{:?}", idx, idx);
94+
let k = format!("k{:?}", idx);
95+
let old_v = format!("k{:?}", idx);
96+
let new_v = format!("k{:?}!", idx);
97+
let res = db_client.put(k, new_v).await?;
98+
log::info!("Response: {:?}", res);
99+
assert_eq!(Some(old_v), res);
100+
}
101+
102+
for idx in 3..n {
103+
log::info!("Send GET k{:?} v{:?}", idx, idx);
104+
let k = format!("k{:?}", idx);
105+
let v = format!("k{:?}!", idx);
106+
let res = db_client.get(k).await?;
107+
log::info!("Response: {:?}", res);
108+
assert_eq!(Some(v), res);
109+
}
110+
72111
Ok(())
73112
}

server/grpc/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ thiserror = "1.0"
1919
tokio = { version ="1.0", features = ["full"] }
2020
futures = "0.3"
2121
tokio-stream = "0.1.5"
22-
#async-stream = "0.3"
22+
bytes = "1.0"
2323

2424
# Grpc
2525
tonic = "0.4.1"

server/grpc/src/grpc/errors.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use crate::grpc::rpc;
22
use protocol::ProtocolError;
3-
use server::ope_db::DbError;
43
use tokio::sync::mpsc::error::SendError;
5-
use tonic::Status;
4+
use tonic::{Response, Status};
65

76
pub fn status_to_protocol_err(status: Status) -> ProtocolError {
87
let msg = format!("Receiving from client error: {:?}", status);
@@ -22,6 +21,11 @@ pub fn send_err_to_protocol_err<T>(err: SendError<T>) -> ProtocolError {
2221
ProtocolError::RpcErr { msg }
2322
}
2423

24+
pub fn send_err_to_status<T>(_msg: Result<Response<T>, Status>) -> Status {
25+
let msg = format!("Sending reply to client failed");
26+
Status::internal(msg)
27+
}
28+
2529
pub fn unexpected_msg_status(expected: &str, actually: Option<rpc::PutCallbackReply>) -> Status {
2630
let msg = format!(
2731
"Wrong message order from client, expected {:?}, actually: {:?}",
@@ -31,7 +35,10 @@ pub fn unexpected_msg_status(expected: &str, actually: Option<rpc::PutCallbackRe
3135
Status::invalid_argument(msg.to_string())
3236
}
3337

34-
pub fn unexpected_msg_err(expected: &str, actually: Option<rpc::PutCallbackReply>) -> ProtocolError {
38+
pub fn unexpected_msg_err(
39+
expected: &str,
40+
actually: Option<rpc::PutCallbackReply>,
41+
) -> ProtocolError {
3542
let msg = format!(
3643
"Wrong message order from client, expected {:?}, actually: {:?}",
3744
expected, actually

server/grpc/src/grpc/get_cb.rs

+5-22
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,13 @@
1-
use kvstore_api::kvstore::KVStore;
2-
use kvstore_inmemory::hashmap_store::HashMapKVStore;
3-
use prost::bytes::Bytes;
4-
use protocol::btree::{BtreeCallback, ClientPutDetails, PutCallback, SearchCallback, SearchResult};
5-
use protocol::RpcFuture;
6-
use rpc::db_rpc_server::DbRpc;
7-
use rpc::{DbInfo, PutValue};
8-
9-
use crate::grpc::rpc::get::value_msg;
101
use crate::grpc::{errors, rpc};
11-
use common::misc::{FromVecBytes, ToBytes, ToVecBytes};
2+
123
use futures::FutureExt;
13-
use log::*;
4+
use prost::bytes::Bytes;
5+
use protocol::btree::{BtreeCallback, SearchCallback, SearchResult};
6+
use protocol::RpcFuture;
147
use rpc::get_callback_reply::Reply as GetReply;
15-
use rpc::put_callback_reply::Reply as PutReply;
16-
use server::ope_btree::BTreeErr::ProtocolErr;
17-
use server::ope_btree::{OpeBTree, OpeBTreeConf, ValRefGen};
18-
use server::ope_db::{DatasetChanged, DbError, OpeDatabase};
19-
use server::Digest;
20-
use std::future::Future;
21-
use std::pin::Pin;
22-
use std::sync::Arc;
238
use tokio::sync::mpsc::Sender;
24-
use tokio_stream::wrappers::ReceiverStream;
259
use tokio_stream::StreamExt;
26-
use tonic::codegen::Stream;
27-
use tonic::{Request, Response, Status, Streaming};
10+
use tonic::{Status, Streaming};
2811

2912
pub struct GetCbImpl {
3013
/// Server sends requests to this channel

server/grpc/src/grpc/mod.rs

+33-23
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
pub mod errors;
22
pub mod rpc;
33

4+
use bytes::Bytes;
5+
use common::misc::ToBytes;
46
use kvstore_api::kvstore::KVStore;
57
use kvstore_inmemory::hashmap_store::HashMapKVStore;
6-
use prost::bytes::Bytes;
7-
use protocol::btree::{BtreeCallback, ClientPutDetails, PutCallback, SearchCallback, SearchResult};
8-
use protocol::RpcFuture;
98
use rpc::db_rpc_server::DbRpc;
10-
use rpc::{DbInfo, PutValue};
11-
12-
use crate::grpc::rpc::get::value_msg;
13-
use common::misc::{FromVecBytes, ToBytes, ToVecBytes};
14-
use futures::FutureExt;
15-
use log::*;
169
use rpc::get_callback_reply::Reply as GetReply;
1710
use rpc::put_callback_reply::Reply as PutReply;
18-
use server::ope_btree::BTreeErr::ProtocolErr;
19-
use server::ope_btree::{OpeBTree, OpeBTreeConf, ValRefGen};
20-
use server::ope_db::{DatasetChanged, DbError, OpeDatabase};
11+
use rpc::DbInfo;
12+
use server::ope_btree::OpeBTreeConf;
13+
use server::ope_db::{DatasetChanged, OpeDatabase};
2114
use server::Digest;
22-
use std::future::Future;
2315
use std::pin::Pin;
2416
use std::sync::Arc;
2517
use tokio::sync::mpsc::Sender;
18+
use tokio::sync::oneshot;
2619
use tokio_stream::wrappers::ReceiverStream;
2720
use tokio_stream::StreamExt;
2821
use tonic::codegen::Stream;
@@ -72,7 +65,7 @@ where
7265
Some(client_reply) => {
7366
if let Err(status) = client_reply {
7467
log::warn!("Client's reply error: {:?}", status);
75-
result_in.send(Err(status));
68+
reply(result_in, Err(status))?;
7669
} else {
7770
match client_reply.unwrap().reply {
7871
Some(GetReply::DbInfo(DbInfo { id, version })) => {
@@ -103,14 +96,19 @@ where
10396
}
10497
};
10598

106-
server_requests_in.send(Ok(search_result)).await;
99+
server_requests_in
100+
.send(Ok(search_result))
101+
.await
102+
.unwrap_or_else(|_| {
103+
log::warn!("Sending reply to client failed")
104+
});
107105
});
108106

109107
log::debug!("Return server request stream");
110108
let stream: Self::GetStream =
111109
Box::pin(ReceiverStream::new(server_requests_out));
112110

113-
result_in.send(Ok(Response::new(stream)));
111+
reply(result_in, Ok(Response::new(stream)))?;
114112
}
115113

116114
//
@@ -122,7 +120,7 @@ where
122120
unexpected
123121
);
124122
log::warn!("{}", msg);
125-
result_in.send(Err(Status::invalid_argument(msg.to_string())));
123+
reply(result_in, Err(Status::invalid_argument(msg.to_string())))?;
126124
}
127125
}
128126
}
@@ -194,32 +192,35 @@ where
194192
}
195193
};
196194

197-
server_requests_in.send(Ok(search_result)).await;
195+
server_requests_in
196+
.send(Ok(search_result))
197+
.await
198+
.unwrap_or_else(|_| log::warn!("Sending reply to client failed"));
198199
});
199200

200201
log::debug!("Return server request stream");
201202
let stream: Self::PutStream =
202203
Box::pin(ReceiverStream::new(server_requests_out));
203204

204-
result_in.send(Ok(Response::new(stream)));
205+
reply(result_in, Ok(Response::new(stream)))?;
205206
}
206207
Ok(unexpected) => {
207208
let status = errors::unexpected_msg_status("PutValue", unexpected);
208-
result_in.send(Err(status));
209+
reply(result_in, Err(status))?;
209210
}
210211
Err(status) => {
211212
log::warn!("Client's reply error: {:?}", status);
212-
result_in.send(Err(status));
213+
reply(result_in, Err(status))?;
213214
}
214215
}
215216
}
216217
Ok(unexpected) => {
217218
let status = errors::unexpected_msg_status("DbInfo", unexpected);
218-
result_in.send(Err(status));
219+
reply(result_in, Err(status))?;
219220
}
220221
Err(status) => {
221222
log::warn!("Client's reply error: {:?}", status);
222-
result_in.send(Err(status));
223+
reply(result_in, Err(status))?;
223224
}
224225
}
225226

@@ -229,3 +230,12 @@ where
229230
.map_err(|_| Status::internal("Result channel error"))?
230231
}
231232
}
233+
234+
fn reply<T>(
235+
sender: oneshot::Sender<Result<Response<T>, Status>>,
236+
payload: Result<Response<T>, Status>,
237+
) -> Result<(), Status> {
238+
sender
239+
.send(payload)
240+
.map_err(|msg| errors::send_err_to_status(msg))
241+
}

server/grpc/src/grpc/put_cb.rs

+5-21
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,15 @@
1-
use kvstore_api::kvstore::KVStore;
2-
use kvstore_inmemory::hashmap_store::HashMapKVStore;
3-
use prost::bytes::Bytes;
4-
use protocol::btree::{BtreeCallback, ClientPutDetails, PutCallback, SearchCallback, SearchResult};
5-
use protocol::{ProtocolError, RpcFuture};
6-
use rpc::db_rpc_server::DbRpc;
7-
use rpc::{DbInfo, PutValue};
8-
9-
use crate::grpc::rpc::get::value_msg;
101
use crate::grpc::{errors, rpc};
11-
use common::misc::{FromVecBytes, ToBytes, ToVecBytes};
2+
use common::misc::ToBytes;
123
use futures::FutureExt;
13-
use log::*;
14-
use rpc::get_callback_reply::Reply as GetReply;
4+
use prost::bytes::Bytes;
5+
use protocol::btree::{BtreeCallback, ClientPutDetails, PutCallback};
6+
use protocol::{ProtocolError, RpcFuture};
157
use rpc::put_callback_reply::Reply as PutReply;
16-
use server::ope_btree::BTreeErr::ProtocolErr;
17-
use server::ope_btree::{OpeBTree, OpeBTreeConf, ValRefGen};
18-
use server::ope_db::{DatasetChanged, DbError, OpeDatabase};
19-
use server::Digest;
20-
use std::future::Future;
21-
use std::pin::Pin;
228
use std::sync::Arc;
239
use tokio::sync::mpsc::Sender;
2410
use tokio::sync::Mutex;
25-
use tokio_stream::wrappers::ReceiverStream;
2611
use tokio_stream::StreamExt;
27-
use tonic::codegen::Stream;
28-
use tonic::{Request, Response, Status, Streaming};
12+
use tonic::{Status, Streaming};
2913

3014
#[derive(Clone)]
3115
pub struct PutCbImpl {

server/grpc/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod grpc;
99

1010
#[tokio::main]
1111
async fn main() -> Result<(), Box<dyn Error + 'static>> {
12-
env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();
12+
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
1313

1414
let conf = OpeBTreeConf {
1515
arity: 4,

0 commit comments

Comments
 (0)