Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "surrealdb-tikv-client"
version = "0.3.0-surreal.1"
version = "0.3.0-surreal.3"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0.3.2-surreal.2 is nowhere? Maybe just use 0.3.0-surreal2, instead of 3?

keywords = ["TiKV", "KV", "distributed-systems"]
license = "Apache-2.0"
authors = ["The TiKV Project Authors"]
Expand Down
1 change: 1 addition & 0 deletions examples/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

#![type_length_limit = "8165158"]
#![allow(clippy::result_large_err)]

mod common;

Expand Down
47 changes: 31 additions & 16 deletions src/common/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use regex::Regex;
use tonic::transport::Certificate;
use tonic::transport::Channel;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Endpoint;
use tonic::transport::Identity;

use crate::internal_err;
Expand Down Expand Up @@ -77,28 +78,42 @@ impl SecurityManager {
where
Factory: FnOnce(Channel) -> Client,
{
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");

info!("connect to rpc server at endpoint: {:?}", addr);

let mut builder = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(3));

if !self.ca.is_empty() {
let tls = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&self.ca))
.identity(Identity::from_pem(
&self.cert,
load_pem_file("private key", &self.key)?,
));
builder = builder.tls_config(tls)?;
let channel = if !self.ca.is_empty() {
self.tls_channel(addr).await?
} else {
self.default_channel(addr).await?
};

let ch = builder.connect().await?;
let ch = channel.connect().await?;

Ok(factory(ch))
}

async fn tls_channel(&self, addr: &str) -> Result<Endpoint> {
let addr = "https://".to_string() + &SCHEME_REG.replace(addr, "");
let builder = self.endpoint(addr.to_string())?;
let tls = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&self.ca))
.identity(Identity::from_pem(
&self.cert,
load_pem_file("private key", &self.key)?,
));
let builder = builder.tls_config(tls)?;
Ok(builder)
}

async fn default_channel(&self, addr: &str) -> Result<Endpoint> {
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");
self.endpoint(addr)
}

fn endpoint(&self, addr: String) -> Result<Endpoint> {
let endpoint = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(3));
Ok(endpoint)
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub grpc_max_decoding_message_size: usize,
pub keyspace: Option<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE: usize = 4 * 1024 * 1024; // 4MB

impl Default for Config {
fn default() -> Self {
Expand All @@ -31,6 +33,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
grpc_max_decoding_message_size: DEFAULT_GRPC_MAX_DECODING_MESSAGE_SIZE,
keyspace: None,
}
}
Expand Down Expand Up @@ -86,6 +89,13 @@ impl Config {
self
}

/// Set the maximum decoding message size for gRPC.
#[must_use]
pub fn with_grpc_max_decoding_message_size(mut self, size: usize) -> Self {
self.grpc_max_decoding_message_size = size;
self
}

/// Set to use default keyspace.
///
/// Server should enable `storage.api-version = 2` to use this feature.
Expand Down
1 change: 0 additions & 1 deletion src/kv/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ const _PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
/// Since `Value` is just an alias for `Vec<u8>`, conversions to and from it are easy.
///
/// Many functions which accept a `Value` accept an `Into<Value>`.

pub type Value = Vec<u8>;
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
//! ```

#![allow(clippy::field_reassign_with_default)]
#![allow(clippy::doc_lazy_continuation)]
#![allow(non_local_definitions)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::legacy_numeric_constants)]
#![allow(unexpected_cfgs)]
#![allow(clippy::result_large_err)]

pub mod backoff;
#[doc(hidden)]
Expand Down
8 changes: 7 additions & 1 deletion src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ impl PdRpcClient<TikvConnect, Cluster> {
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| {
TikvConnect::new(
security_mgr,
config.timeout,
config.grpc_max_decoding_message_size,
)
},
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_codec,
)
Expand Down
2 changes: 1 addition & 1 deletion src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
cluster
.get_all_stores(self.timeout)
.await
.map(|resp| resp.stores.into_iter().map(Into::into).collect())
.map(|resp| resp.stores.into_iter().collect())
})
}

Expand Down
6 changes: 5 additions & 1 deletion src/store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub trait KvConnect: Sized + Send + Sync + 'static {
pub struct TikvConnect {
security_mgr: Arc<SecurityManager>,
timeout: Duration,
grpc_max_decoding_message_size: usize,
}

#[async_trait]
Expand All @@ -33,7 +34,10 @@ impl KvConnect for TikvConnect {

async fn connect(&self, address: &str) -> Result<KvRpcClient> {
self.security_mgr
.connect(address, TikvClient::new)
.connect(address, move |channel| {
TikvClient::new(channel)
.max_decoding_message_size(self.grpc_max_decoding_message_size)
})
.await
.map(|c| KvRpcClient::new(c, self.timeout))
}
Expand Down
9 changes: 4 additions & 5 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,8 @@ pub fn new_pessimistic_prewrite_request(
let len = mutations.len();
let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl);
req.for_update_ts = for_update_ts;
req.pessimistic_actions = iter::repeat(PessimisticAction::DoPessimisticCheck.into())
.take(len)
.collect();
req.pessimistic_actions =
iter::repeat_n(PessimisticAction::DoPessimisticCheck.into(), len).collect();
req
}

Expand Down Expand Up @@ -369,7 +368,7 @@ impl Shardable for kvrpcpb::CommitRequest {

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
self.keys = shard.into_iter().map(Into::into).collect();
self.keys = shard.into_iter().collect();
Ok(())
}
}
Expand Down Expand Up @@ -589,7 +588,7 @@ impl Merge<kvrpcpb::ScanLockResponse> for Collect {
fn merge(&self, input: Vec<Result<kvrpcpb::ScanLockResponse>>) -> Result<Self::Out> {
input
.into_iter()
.flat_map_ok(|mut resp| resp.take_locks().into_iter().map(Into::into))
.flat_map_ok(|mut resp| resp.take_locks().into_iter())
.collect()
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,7 @@ impl<PdC: PdClient> Transaction<PdC> {
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
.await
.map(|r| r.into_iter().map(Into::into).collect())
plan.execute().await.map(|r| r.into_iter().collect())
})
.await
.map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
Expand Down Expand Up @@ -807,9 +805,7 @@ impl<PdC: PdClient> Transaction<PdC> {
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
.await
.map(|r| r.into_iter().map(Into::into).collect())
plan.execute().await.map(|r| r.into_iter().collect())
},
)
.await
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ async fn raw_write_million() -> Result<()> {
// test batch_scan
for batch_num in 1..4 {
let _ = client
.batch_scan(iter::repeat(vec![]..).take(batch_num), limit)
.batch_scan(iter::repeat_n(vec![].., batch_num), limit)
.await?;
// FIXME: `each_limit` parameter does no work as expected. It limits the
// entries on each region of each rangqe, instead of each range.
Expand Down
Loading