Skip to content

Commit 878d489

Browse files
Merge pull request #7 from surrealdb/emmanuel/enable-max-grpc-config-size
Re-sync with recent fixes - prepare 0.3.0-surreal.3
2 parents b2a2ca7 + 08c589f commit 878d489

File tree

12 files changed

+69
-33
lines changed

12 files changed

+69
-33
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "surrealdb-tikv-client"
3-
version = "0.3.0-surreal.1"
3+
version = "0.3.0-surreal.3"
44
keywords = ["TiKV", "KV", "distributed-systems"]
55
license = "Apache-2.0"
66
authors = ["The TiKV Project Authors"]

examples/raw.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

33
#![type_length_limit = "8165158"]
4+
#![allow(clippy::result_large_err)]
45

56
mod common;
67

src/common/security.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use regex::Regex;
1111
use tonic::transport::Certificate;
1212
use tonic::transport::Channel;
1313
use tonic::transport::ClientTlsConfig;
14+
use tonic::transport::Endpoint;
1415
use tonic::transport::Identity;
1516

1617
use crate::internal_err;
@@ -77,28 +78,42 @@ impl SecurityManager {
7778
where
7879
Factory: FnOnce(Channel) -> Client,
7980
{
80-
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");
81-
8281
info!("connect to rpc server at endpoint: {:?}", addr);
8382

84-
let mut builder = Channel::from_shared(addr)?
85-
.tcp_keepalive(Some(Duration::from_secs(10)))
86-
.keep_alive_timeout(Duration::from_secs(3));
87-
88-
if !self.ca.is_empty() {
89-
let tls = ClientTlsConfig::new()
90-
.ca_certificate(Certificate::from_pem(&self.ca))
91-
.identity(Identity::from_pem(
92-
&self.cert,
93-
load_pem_file("private key", &self.key)?,
94-
));
95-
builder = builder.tls_config(tls)?;
83+
let channel = if !self.ca.is_empty() {
84+
self.tls_channel(addr).await?
85+
} else {
86+
self.default_channel(addr).await?
9687
};
97-
98-
let ch = builder.connect().await?;
88+
let ch = channel.connect().await?;
9989

10090
Ok(factory(ch))
10191
}
92+
93+
async fn tls_channel(&self, addr: &str) -> Result<Endpoint> {
94+
let addr = "https://".to_string() + &SCHEME_REG.replace(addr, "");
95+
let builder = self.endpoint(addr.to_string())?;
96+
let tls = ClientTlsConfig::new()
97+
.ca_certificate(Certificate::from_pem(&self.ca))
98+
.identity(Identity::from_pem(
99+
&self.cert,
100+
load_pem_file("private key", &self.key)?,
101+
));
102+
let builder = builder.tls_config(tls)?;
103+
Ok(builder)
104+
}
105+
106+
async fn default_channel(&self, addr: &str) -> Result<Endpoint> {
107+
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");
108+
self.endpoint(addr)
109+
}
110+
111+
fn endpoint(&self, addr: String) -> Result<Endpoint> {
112+
let endpoint = Channel::from_shared(addr)?
113+
.tcp_keepalive(Some(Duration::from_secs(10)))
114+
.keep_alive_timeout(Duration::from_secs(3));
115+
Ok(endpoint)
116+
}
102117
}
103118

104119
#[cfg(test)]

src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ pub struct Config {
1919
pub cert_path: Option<PathBuf>,
2020
pub key_path: Option<PathBuf>,
2121
pub timeout: Duration,
22+
pub grpc_max_decoding_message_size: usize,
2223
pub keyspace: Option<String>,
2324
}
2425

2526
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
27+
const DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE: usize = 4 * 1024 * 1024; // 4MB
2628

2729
impl Default for Config {
2830
fn default() -> Self {
@@ -31,6 +33,7 @@ impl Default for Config {
3133
cert_path: None,
3234
key_path: None,
3335
timeout: DEFAULT_REQUEST_TIMEOUT,
36+
grpc_max_decoding_message_size: DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE,
3437
keyspace: None,
3538
}
3639
}
@@ -86,6 +89,13 @@ impl Config {
8689
self
8790
}
8891

92+
/// Set the maximum decoding message size for gRPC.
93+
#[must_use]
94+
pub fn with_grpc_max_decoding_message_size(mut self, size: usize) -> Self {
95+
self.grpc_max_decoding_message_size = size;
96+
self
97+
}
98+
8999
/// Set to use default keyspace.
90100
///
91101
/// Server should enable `storage.api-version = 2` to use this feature.

src/kv/value.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,4 @@ const _PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
1111
/// Since `Value` is just an alias for `Vec<u8>`, conversions to and from it are easy.
1212
///
1313
/// Many functions which accept a `Value` accept an `Into<Value>`.
14-
1514
pub type Value = Vec<u8>;

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@
9191
//! ```
9292
9393
#![allow(clippy::field_reassign_with_default)]
94+
#![allow(clippy::doc_lazy_continuation)]
95+
#![allow(non_local_definitions)]
96+
#![allow(clippy::needless_lifetimes)]
97+
#![allow(clippy::legacy_numeric_constants)]
98+
#![allow(unexpected_cfgs)]
99+
#![allow(clippy::result_large_err)]
94100

95101
pub mod backoff;
96102
#[doc(hidden)]

src/pd/client.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,13 @@ impl PdRpcClient<TikvConnect, Cluster> {
284284
) -> Result<PdRpcClient> {
285285
PdRpcClient::new(
286286
config.clone(),
287-
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
287+
|security_mgr| {
288+
TikvConnect::new(
289+
security_mgr,
290+
config.timeout,
291+
config.grpc_max_decoding_message_size,
292+
)
293+
},
288294
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
289295
enable_codec,
290296
)

src/pd/retry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
184184
cluster
185185
.get_all_stores(self.timeout)
186186
.await
187-
.map(|resp| resp.stores.into_iter().map(Into::into).collect())
187+
.map(|resp| resp.stores.into_iter().collect())
188188
})
189189
}
190190

src/store/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub trait KvConnect: Sized + Send + Sync + 'static {
2525
pub struct TikvConnect {
2626
security_mgr: Arc<SecurityManager>,
2727
timeout: Duration,
28+
grpc_max_decoding_message_size: usize,
2829
}
2930

3031
#[async_trait]
@@ -33,7 +34,10 @@ impl KvConnect for TikvConnect {
3334

3435
async fn connect(&self, address: &str) -> Result<KvRpcClient> {
3536
self.security_mgr
36-
.connect(address, TikvClient::new)
37+
.connect(address, move |channel| {
38+
TikvClient::new(channel)
39+
.max_decoding_message_size(self.grpc_max_decoding_message_size)
40+
})
3741
.await
3842
.map(|c| KvRpcClient::new(c, self.timeout))
3943
}

src/transaction/requests.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,8 @@ pub fn new_pessimistic_prewrite_request(
267267
let len = mutations.len();
268268
let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl);
269269
req.for_update_ts = for_update_ts;
270-
req.pessimistic_actions = iter::repeat(PessimisticAction::DoPessimisticCheck.into())
271-
.take(len)
272-
.collect();
270+
req.pessimistic_actions =
271+
iter::repeat_n(PessimisticAction::DoPessimisticCheck.into(), len).collect();
273272
req
274273
}
275274

@@ -369,7 +368,7 @@ impl Shardable for kvrpcpb::CommitRequest {
369368

370369
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
371370
self.set_leader(&store.region_with_leader)?;
372-
self.keys = shard.into_iter().map(Into::into).collect();
371+
self.keys = shard.into_iter().collect();
373372
Ok(())
374373
}
375374
}
@@ -589,7 +588,7 @@ impl Merge<kvrpcpb::ScanLockResponse> for Collect {
589588
fn merge(&self, input: Vec<Result<kvrpcpb::ScanLockResponse>>) -> Result<Self::Out> {
590589
input
591590
.into_iter()
592-
.flat_map_ok(|mut resp| resp.take_locks().into_iter().map(Into::into))
591+
.flat_map_ok(|mut resp| resp.take_locks().into_iter())
593592
.collect()
594593
}
595594
}

0 commit comments

Comments
 (0)