Skip to content

Commit 7c78aad

Browse files
authored
plan: Handle no leader and invalidate store region (#484)
close #479 Signed-off-by: Ping Yu <[email protected]>
1 parent 32b0cff commit 7c78aad

File tree

17 files changed

+282
-192
lines changed

17 files changed

+282
-192
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ jobs:
6161
CARGO_INCREMENTAL: 0
6262
NEXTEST_PROFILE: ci
6363
TIKV_VERSION: v8.5.1
64+
RUST_LOG: info
6465
runs-on: ubuntu-latest
6566
steps:
6667
- uses: actions/checkout@v4

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
5757
rstest = "0.18.2"
5858
serde_json = "1"
5959
serial_test = "0.5.0"
60-
simple_logger = "1"
6160
tempfile = "3.6"
6261
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
6362

src/common/errors.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::result;
55
use thiserror::Error;
66

77
use crate::proto::kvrpcpb;
8+
use crate::region::RegionVerId;
89
use crate::BoundRange;
910

1011
/// An error originating from the TiKV client or dependencies.
@@ -90,8 +91,8 @@ pub enum Error {
9091
#[error("Region {} is not found in the response", region_id)]
9192
RegionNotFoundInResponse { region_id: u64 },
9293
/// No leader is found for the given id.
93-
#[error("Leader of region {} is not found", region_id)]
94-
LeaderNotFound { region_id: u64 },
94+
#[error("Leader of region {} is not found", region.id)]
95+
LeaderNotFound { region: RegionVerId },
9596
/// Scan limit exceeds the maximum
9697
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
9798
MaxScanLimitExceeded { limit: u32, max_limit: u32 },

src/mock.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ impl PdClient for MockPdClient {
216216

217217
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
218218

219+
async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {}
220+
219221
async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
220222
unimplemented!()
221223
}

src/pd/client.rs

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::proto::metapb;
2020
use crate::region::RegionId;
2121
use crate::region::RegionVerId;
2222
use crate::region::RegionWithLeader;
23+
use crate::region::StoreId;
2324
use crate::region_cache::RegionCache;
2425
use crate::store::KvConnect;
2526
use crate::store::RegionStore;
@@ -84,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static {
8485
fn group_keys_by_region<K, K2>(
8586
self: Arc<Self>,
8687
keys: impl Iterator<Item = K> + Send + Sync + 'static,
87-
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<K2>)>>
88+
) -> BoxStream<'static, Result<(Vec<K2>, RegionWithLeader)>>
8889
where
8990
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
9091
K2: Send + Sync + 'static,
@@ -102,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
102103
}
103104
grouped.push(keys.next().unwrap().into());
104105
}
105-
Ok(Some((keys, (region, grouped))))
106+
Ok(Some((keys, (grouped, region))))
106107
} else {
107108
Ok(None)
108109
}
@@ -112,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static {
112113
}
113114

114115
/// Returns a Stream which iterates over the contexts for each region covered by range.
115-
fn stores_for_range(
116+
fn regions_for_range(
116117
self: Arc<Self>,
117118
range: BoundRange,
118-
) -> BoxStream<'static, Result<RegionStore>> {
119+
) -> BoxStream<'static, Result<RegionWithLeader>> {
119120
let (start_key, end_key) = range.into_keys();
120121
stream_fn(Some(start_key), move |start_key| {
121122
let end_key = end_key.clone();
@@ -128,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static {
128129

129130
let region = this.region_for_key(&start_key).await?;
130131
let region_end = region.end_key();
131-
let store = this.map_region_to_store(region).await?;
132132
if end_key
133133
.map(|x| x <= region_end && !x.is_empty())
134134
.unwrap_or(false)
135135
|| region_end.is_empty()
136136
{
137-
return Ok(Some((None, store)));
137+
return Ok(Some((None, region)));
138138
}
139-
Ok(Some((Some(region_end), store)))
139+
Ok(Some((Some(region_end), region)))
140140
}
141141
})
142142
.boxed()
@@ -146,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static {
146146
fn group_ranges_by_region(
147147
self: Arc<Self>,
148148
mut ranges: Vec<kvrpcpb::KeyRange>,
149-
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<kvrpcpb::KeyRange>)>> {
149+
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
150150
ranges.reverse();
151151
stream_fn(Some(ranges), move |ranges| {
152152
let this = self.clone();
@@ -166,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static {
166166
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
167167
grouped.push(make_key_range(start_key.into(), region_end.clone().into()));
168168
ranges.push(make_key_range(region_end.into(), end_key.into()));
169-
return Ok(Some((Some(ranges), (region, grouped))));
169+
return Ok(Some((Some(ranges), (grouped, region))));
170170
}
171171
grouped.push(range);
172172

@@ -181,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static {
181181
grouped
182182
.push(make_key_range(start_key.into(), region_end.clone().into()));
183183
ranges.push(make_key_range(region_end.into(), end_key.into()));
184-
return Ok(Some((Some(ranges), (region, grouped))));
184+
return Ok(Some((Some(ranges), (grouped, region))));
185185
}
186186
grouped.push(range);
187187
}
188-
Ok(Some((Some(ranges), (region, grouped))))
188+
Ok(Some((Some(ranges), (grouped, region))))
189189
} else {
190190
Ok(None)
191191
}
@@ -205,6 +205,8 @@ pub trait PdClient: Send + Sync + 'static {
205205
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
206206

207207
async fn invalidate_region_cache(&self, ver_id: RegionVerId);
208+
209+
async fn invalidate_store_cache(&self, store_id: StoreId);
208210
}
209211

210212
/// This client converts requests for the logical TiKV cluster into requests
@@ -271,6 +273,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
271273
self.region_cache.invalidate_region_cache(ver_id).await
272274
}
273275

276+
async fn invalidate_store_cache(&self, store_id: StoreId) {
277+
self.region_cache.invalidate_store_cache(store_id).await
278+
}
279+
274280
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
275281
self.pd.load_keyspace(keyspace).await
276282
}
@@ -390,7 +396,7 @@ pub mod test {
390396
let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
391397
let mut stream = executor::block_on_stream(stream);
392398

393-
let result: Vec<Key> = stream.next().unwrap().unwrap().1;
399+
let result: Vec<Key> = stream.next().unwrap().unwrap().0;
394400
assert_eq!(
395401
result,
396402
vec![
@@ -401,27 +407,27 @@ pub mod test {
401407
]
402408
);
403409
assert_eq!(
404-
stream.next().unwrap().unwrap().1,
410+
stream.next().unwrap().unwrap().0,
405411
vec![vec![12].into(), vec![11, 4].into()]
406412
);
407413
assert!(stream.next().is_none());
408414
}
409415

410416
#[test]
411-
fn test_stores_for_range() {
417+
fn test_regions_for_range() {
412418
let client = Arc::new(MockPdClient::default());
413419
let k1: Key = vec![1].into();
414420
let k2: Key = vec![5, 2].into();
415421
let k3: Key = vec![11, 4].into();
416422
let range1 = (k1, k2.clone()).into();
417-
let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1));
418-
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
423+
let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1));
424+
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
419425
assert!(stream.next().is_none());
420426

421427
let range2 = (k2, k3).into();
422-
let mut stream = executor::block_on_stream(client.stores_for_range(range2));
423-
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
424-
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2);
428+
let mut stream = executor::block_on_stream(client.regions_for_range(range2));
429+
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
430+
assert_eq!(stream.next().unwrap().unwrap().id(), 2);
425431
assert!(stream.next().is_none());
426432
}
427433

@@ -446,20 +452,20 @@ pub mod test {
446452
let ranges3 = stream.next().unwrap().unwrap();
447453
let ranges4 = stream.next().unwrap().unwrap();
448454

449-
assert_eq!(ranges1.0.id(), 1);
455+
assert_eq!(ranges1.1.id(), 1);
450456
assert_eq!(
451-
ranges1.1,
457+
ranges1.0,
452458
vec![
453459
make_key_range(k1.clone(), k2.clone()),
454460
make_key_range(k1.clone(), k_split.clone()),
455461
]
456462
);
457-
assert_eq!(ranges2.0.id(), 2);
458-
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]);
459-
assert_eq!(ranges3.0.id(), 1);
460-
assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]);
461-
assert_eq!(ranges4.0.id(), 2);
462-
assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]);
463+
assert_eq!(ranges2.1.id(), 2);
464+
assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]);
465+
assert_eq!(ranges3.1.id(), 1);
466+
assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]);
467+
assert_eq!(ranges4.1.id(), 2);
468+
assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]);
463469
assert!(stream.next().is_none());
464470

465471
let range1 = make_key_range(k1.clone(), k2.clone());
@@ -470,11 +476,11 @@ pub mod test {
470476
let ranges1 = stream.next().unwrap().unwrap();
471477
let ranges2 = stream.next().unwrap().unwrap();
472478
let ranges3 = stream.next().unwrap().unwrap();
473-
assert_eq!(ranges1.0.id(), 1);
474-
assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]);
475-
assert_eq!(ranges2.0.id(), 2);
476-
assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]);
477-
assert_eq!(ranges3.0.id(), 3);
478-
assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]);
479+
assert_eq!(ranges1.1.id(), 1);
480+
assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]);
481+
assert_eq!(ranges2.1.id(), 2);
482+
assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]);
483+
assert_eq!(ranges3.1.id(), 3);
484+
assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]);
479485
}
480486
}

src/raw/requests.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use crate::request::SingleKey;
2222
use crate::shardable_key;
2323
use crate::shardable_keys;
2424
use crate::shardable_range;
25-
use crate::store::store_stream_for_keys;
26-
use crate::store::store_stream_for_ranges;
25+
use crate::store::region_stream_for_keys;
26+
use crate::store::region_stream_for_ranges;
2727
use crate::store::RegionStore;
2828
use crate::store::Request;
2929
use crate::transaction::HasLocks;
@@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
194194
fn shards(
195195
&self,
196196
pd_client: &Arc<impl PdClient>,
197-
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
197+
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
198198
let kvs = self.pairs.clone();
199199
let ttls = self.ttls.clone();
200200
let mut kv_ttl: Vec<KvPairTTL> = kvs
@@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
203203
.map(|(kv, ttl)| KvPairTTL(kv, ttl))
204204
.collect();
205205
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
206-
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
206+
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
207207
}
208208

209-
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
209+
fn apply_shard(&mut self, shard: Self::Shard) {
210210
let (pairs, ttls) = shard.into_iter().unzip();
211-
self.set_leader(&store.region_with_leader)?;
212211
self.pairs = pairs;
213212
self.ttls = ttls;
214-
Ok(())
213+
}
214+
215+
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
216+
self.set_leader(&store.region_with_leader)
215217
}
216218
}
217219

@@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
344346
fn shards(
345347
&self,
346348
pd_client: &Arc<impl PdClient>,
347-
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
348-
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
349+
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
350+
region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
349351
}
350352

351-
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
352-
self.set_leader(&store.region_with_leader)?;
353+
fn apply_shard(&mut self, shard: Self::Shard) {
353354
self.ranges = shard;
354-
Ok(())
355+
}
356+
357+
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
358+
self.set_leader(&store.region_with_leader)
355359
}
356360
}
357361

@@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest {
470474
fn shards(
471475
&self,
472476
pd_client: &Arc<impl PdClient>,
473-
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
474-
store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
477+
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
478+
region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
479+
}
480+
481+
fn apply_shard(&mut self, shard: Self::Shard) {
482+
self.inner.ranges = shard;
475483
}
476484

477-
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
485+
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
478486
self.set_leader(&store.region_with_leader)?;
479-
self.inner.ranges.clone_from(&shard);
480-
self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard);
487+
self.inner.data = (self.data_builder)(
488+
store.region_with_leader.region.clone(),
489+
self.inner.ranges.clone(),
490+
);
481491
Ok(())
482492
}
483493
}

src/region.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl RegionWithLeader {
7373
.as_ref()
7474
.cloned()
7575
.ok_or_else(|| Error::LeaderNotFound {
76-
region_id: self.id(),
76+
region: self.ver_id(),
7777
})
7878
.map(|s| s.store_id)
7979
}

src/region_cache.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ impl<C: RetryClientTrait> RegionCache<C> {
233233
}
234234
}
235235

236+
pub async fn invalidate_store_cache(&self, store_id: StoreId) {
237+
let mut cache = self.store_cache.write().await;
238+
cache.remove(&store_id);
239+
}
240+
236241
pub async fn read_through_all_stores(&self) -> Result<Vec<Store>> {
237242
let stores = self
238243
.inner_client

src/request/mod.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ mod test {
103103
use crate::proto::pdpb::Timestamp;
104104
use crate::proto::tikvpb::tikv_client::TikvClient;
105105
use crate::region::RegionWithLeader;
106-
use crate::store::store_stream_for_keys;
106+
use crate::store::region_stream_for_keys;
107107
use crate::store::HasRegionError;
108108
use crate::transaction::lowering::new_commit_request;
109109
use crate::Error;
@@ -168,22 +168,20 @@ mod test {
168168
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
169169
) -> futures::stream::BoxStream<
170170
'static,
171-
crate::Result<(Self::Shard, crate::store::RegionStore)>,
171+
crate::Result<(Self::Shard, crate::region::RegionWithLeader)>,
172172
> {
173173
// Increases by 1 for each call.
174174
self.test_invoking_count
175175
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
176-
store_stream_for_keys(
176+
region_stream_for_keys(
177177
Some(Key::from("mock_key".to_owned())).into_iter(),
178178
pd_client.clone(),
179179
)
180180
}
181181

182-
fn apply_shard(
183-
&mut self,
184-
_shard: Self::Shard,
185-
_store: &crate::store::RegionStore,
186-
) -> crate::Result<()> {
182+
fn apply_shard(&mut self, _shard: Self::Shard) {}
183+
184+
fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> {
187185
Ok(())
188186
}
189187
}

0 commit comments

Comments
 (0)