Skip to content

Commit ae22ca1

Browse files
authored
fix: handling accessing shard db after shard removal (#2237)
* Address the issue that shard storage db is accessed after the db is removed * Address unit test failure; cleanup code; enhance simtest
1 parent a8d6f87 commit ae22ca1

File tree

13 files changed

+255
-193
lines changed

13 files changed

+255
-193
lines changed

crates/typed-store/src/rocks.rs

Lines changed: 39 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -586,22 +586,7 @@ impl<K, V> DBMap<K, V> {
586586
let from_buf = be_fix_int_ser(start)?;
587587
let to_buf = be_fix_int_ser(end)?;
588588
self.rocksdb
589-
.compact_range_cf(&self.cf(), Some(from_buf), Some(to_buf));
590-
Ok(())
591-
}
592-
593-
/// Compact a range of keys in a specific column family
594-
pub fn compact_range_raw(
595-
&self,
596-
cf_name: &str,
597-
start: Vec<u8>,
598-
end: Vec<u8>,
599-
) -> Result<(), TypedStoreError> {
600-
let cf = self
601-
.rocksdb
602-
.cf_handle(cf_name)
603-
.expect("compact range: column family does not exist");
604-
self.rocksdb.compact_range_cf(&cf, Some(start), Some(end));
589+
.compact_range_cf(&self.cf()?, Some(from_buf), Some(to_buf));
605590
Ok(())
606591
}
607592

@@ -614,37 +599,24 @@ impl<K, V> DBMap<K, V> {
614599
let from_buf = be_fix_int_ser(start)?;
615600
let to_buf = be_fix_int_ser(end)?;
616601
self.rocksdb
617-
.compact_range_to_bottom(&self.cf(), Some(from_buf), Some(to_buf));
602+
.compact_range_to_bottom(&self.cf()?, Some(from_buf), Some(to_buf));
618603
Ok(())
619604
}
620605

621606
/// Get the column family
622-
pub fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
623-
self.rocksdb.cf_handle(&self.cf).unwrap_or_else(|| {
624-
// Force capture backtrace regardless of RUST_BACKTRACE env var
625-
let backtrace = std::backtrace::Backtrace::force_capture();
626-
eprintln!("PANIC: Column family '{}' not found!", &self.cf);
627-
eprintln!("Database path: {:?}", &self.rocksdb.db_path);
628-
eprintln!("Full backtrace:\n{}", backtrace);
629-
panic!(
630-
"Map-keying column family {} should have been checked at DB creation",
631-
&self.cf
632-
)
633-
})
607+
pub fn cf(&self) -> Result<Arc<rocksdb::BoundColumnFamily<'_>>, TypedStoreError> {
608+
self.rocksdb
609+
.cf_handle(&self.cf)
610+
.ok_or_else(|| TypedStoreError::UnregisteredColumn(self.cf.clone()))
634611
}
635612

636613
/// Flush the column family
637614
pub fn flush(&self) -> Result<(), TypedStoreError> {
638615
self.rocksdb
639-
.flush_cf(&self.cf())
616+
.flush_cf(&self.cf()?)
640617
.map_err(|e| TypedStoreError::RocksDBError(e.into_string()))
641618
}
642619

643-
/// Set the options for the column family
644-
pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> {
645-
self.rocksdb.set_options_cf(&self.cf(), opts)
646-
}
647-
648620
fn get_int_property(
649621
rocksdb: &RocksDB,
650622
cf: &impl AsColumnFamilyRef,
@@ -686,7 +658,7 @@ impl<K, V> DBMap<K, V> {
686658
let results: Result<Vec<_>, TypedStoreError> = self
687659
.rocksdb
688660
.batched_multi_get_cf_opt(
689-
&self.cf(),
661+
&self.cf()?,
690662
keys_refs,
691663
/*sorted_keys=*/ false,
692664
&self.opts.readopts(),
@@ -940,7 +912,7 @@ impl<K, V> DBMap<K, V> {
940912
.expect("the function parameters are valid");
941913
let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2)
942914
.expect("the function parameters are valid");
943-
for item in self.safe_iter() {
915+
for item in self.safe_iter()? {
944916
let (key, value) = item?;
945917
num_keys += 1;
946918
let key_len = be_fix_int_ser(key.borrow())?.len();
@@ -1043,7 +1015,7 @@ impl<K, V> DBMap<K, V> {
10431015
.unwrap_or(Bound::Unbounded),
10441016
));
10451017

1046-
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1018+
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf()?, readopts);
10471019
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
10481020
let iter = SafeIter::new(
10491021
self.cf.clone(),
@@ -1270,7 +1242,7 @@ impl DBBatch {
12701242
.into_iter()
12711243
.try_for_each::<_, Result<_, TypedStoreError>>(|k| {
12721244
let k_buf = be_fix_int_ser(k.borrow())?;
1273-
self.batch.delete_cf(&db.cf(), k_buf);
1245+
self.batch.delete_cf(&db.cf()?, k_buf);
12741246

12751247
Ok(())
12761248
})?;
@@ -1299,7 +1271,7 @@ impl DBBatch {
12991271
let from_buf = be_fix_int_ser(from)?;
13001272
let to_buf = be_fix_int_ser(to)?;
13011273

1302-
self.batch.delete_range_cf(&db.cf(), from_buf, to_buf);
1274+
self.batch.delete_range_cf(&db.cf()?, from_buf, to_buf);
13031275
Ok(())
13041276
}
13051277

@@ -1319,7 +1291,7 @@ impl DBBatch {
13191291
let k_buf = be_fix_int_ser(k.borrow())?;
13201292
let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
13211293
total += k_buf.len() + v_buf.len();
1322-
self.batch.put_cf(&db.cf(), k_buf, v_buf);
1294+
self.batch.put_cf(&db.cf()?, k_buf, v_buf);
13231295
Ok(())
13241296
})?;
13251297
self.db_metrics
@@ -1343,7 +1315,7 @@ impl DBBatch {
13431315
.into_iter()
13441316
.try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
13451317
let k_buf = be_fix_int_ser(k.borrow())?;
1346-
self.batch.merge_cf(&db.cf(), k_buf, v);
1318+
self.batch.merge_cf(&db.cf()?, k_buf, v);
13471319
Ok(())
13481320
})?;
13491321
Ok(self)
@@ -1370,10 +1342,10 @@ where
13701342
let readopts = self.opts.readopts();
13711343
Ok(self
13721344
.rocksdb
1373-
.key_may_exist_cf(&self.cf(), &key_buf, &readopts)
1345+
.key_may_exist_cf(&self.cf()?, &key_buf, &readopts)
13741346
&& self
13751347
.rocksdb
1376-
.get_pinned_cf_opt(&self.cf(), &key_buf, &readopts)
1348+
.get_pinned_cf_opt(&self.cf()?, &key_buf, &readopts)
13771349
.map_err(typed_store_err_from_rocks_err)?
13781350
.is_some())
13791351
}
@@ -1406,7 +1378,7 @@ where
14061378
let key_buf = be_fix_int_ser(key)?;
14071379
let res = self
14081380
.rocksdb
1409-
.get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
1381+
.get_pinned_cf_opt(&self.cf()?, &key_buf, &self.opts.readopts())
14101382
.map_err(typed_store_err_from_rocks_err)?;
14111383
self.db_metrics
14121384
.op_metrics
@@ -1452,7 +1424,7 @@ where
14521424
.report_metrics(&self.cf);
14531425
}
14541426
self.rocksdb
1455-
.put_cf(&self.cf(), &key_buf, &value_buf, &self.opts.writeopts())
1427+
.put_cf(&self.cf()?, &key_buf, &value_buf, &self.opts.writeopts())
14561428
.map_err(typed_store_err_from_rocks_err)?;
14571429

14581430
let elapsed = timer.stop_and_record();
@@ -1488,7 +1460,7 @@ where
14881460
};
14891461
let key_buf = be_fix_int_ser(key)?;
14901462
self.rocksdb
1491-
.delete_cf(&self.cf(), key_buf, &self.opts.writeopts())
1463+
.delete_cf(&self.cf()?, key_buf, &self.opts.writeopts())
14921464
.map_err(typed_store_err_from_rocks_err)?;
14931465
self.db_metrics
14941466
.op_metrics
@@ -1526,7 +1498,7 @@ where
15261498
/// overridden in the config), so please use this function with caution
15271499
#[tracing::instrument(level = "trace", skip_all, err)]
15281500
fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1529-
let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1501+
let first_key = self.safe_iter()?.next().transpose()?.map(|(k, _v)| k);
15301502
let last_key = self
15311503
.reversed_safe_iter_with_bounds(None, None)?
15321504
.next()
@@ -1541,57 +1513,63 @@ where
15411513
}
15421514

15431515
fn is_empty(&self) -> bool {
1544-
self.safe_iter().next().is_none()
1516+
self.safe_iter()
1517+
.expect("safe_iter should not fail")
1518+
.next()
1519+
.is_none()
15451520
}
15461521

1547-
fn safe_iter(&'a self) -> Self::SafeIterator {
1522+
fn safe_iter(&'a self) -> Result<Self::SafeIterator, TypedStoreError> {
15481523
let db_iter = self
15491524
.rocksdb
1550-
.raw_iterator_cf(&self.cf(), self.opts.readopts());
1525+
.raw_iterator_cf(&self.cf()?, self.opts.readopts());
15511526
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1552-
SafeIter::new(
1527+
Ok(SafeIter::new(
15531528
self.cf.clone(),
15541529
db_iter,
15551530
_timer,
15561531
_perf_ctx,
15571532
bytes_scanned,
15581533
keys_scanned,
15591534
Some(self.db_metrics.clone()),
1560-
)
1535+
))
15611536
}
15621537

15631538
fn safe_iter_with_bounds(
15641539
&'a self,
15651540
lower_bound: Option<K>,
15661541
upper_bound: Option<K>,
1567-
) -> Self::SafeIterator {
1542+
) -> Result<Self::SafeIterator, TypedStoreError> {
15681543
let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
1569-
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1544+
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf()?, readopts);
15701545
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1571-
SafeIter::new(
1546+
Ok(SafeIter::new(
15721547
self.cf.clone(),
15731548
db_iter,
15741549
_timer,
15751550
_perf_ctx,
15761551
bytes_scanned,
15771552
keys_scanned,
15781553
Some(self.db_metrics.clone()),
1579-
)
1554+
))
15801555
}
15811556

1582-
fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
1557+
fn safe_range_iter(
1558+
&'a self,
1559+
range: impl RangeBounds<K>,
1560+
) -> Result<Self::SafeIterator, TypedStoreError> {
15831561
let readopts = self.create_read_options_with_range(range);
1584-
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf(), readopts);
1562+
let db_iter = self.rocksdb.raw_iterator_cf(&self.cf()?, readopts);
15851563
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1586-
SafeIter::new(
1564+
Ok(SafeIter::new(
15871565
self.cf.clone(),
15881566
db_iter,
15891567
_timer,
15901568
_perf_ctx,
15911569
bytes_scanned,
15921570
keys_scanned,
15931571
Some(self.db_metrics.clone()),
1594-
)
1572+
))
15951573
}
15961574

15971575
/// Returns a vector of values corresponding to the keys provided.

crates/typed-store/src/rocks/tests.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ where
7272
K: Serialize + DeserializeOwned,
7373
V: Serialize + DeserializeOwned,
7474
{
75-
TestIteratorWrapper::SafeIter(db.safe_iter())
75+
TestIteratorWrapper::SafeIter(db.safe_iter().expect("failed to get iterator"))
7676
}
7777

7878
fn get_reverse_iter<K, V>(
@@ -97,7 +97,10 @@ where
9797
K: Serialize + DeserializeOwned,
9898
V: Serialize + DeserializeOwned,
9999
{
100-
TestIteratorWrapper::SafeIter(db.safe_iter_with_bounds(lower_bound, upper_bound))
100+
TestIteratorWrapper::SafeIter(
101+
db.safe_iter_with_bounds(lower_bound, upper_bound)
102+
.expect("failed to get iterator"),
103+
)
101104
}
102105

103106
fn get_range_iter<K, V>(
@@ -108,7 +111,7 @@ where
108111
K: Serialize + DeserializeOwned,
109112
V: Serialize + DeserializeOwned,
110113
{
111-
TestIteratorWrapper::SafeIter(db.safe_range_iter(range))
114+
TestIteratorWrapper::SafeIter(db.safe_range_iter(range).expect("failed to get iterator"))
112115
}
113116

114117
#[tokio::test]
@@ -489,17 +492,17 @@ async fn test_clear() {
489492
insert_batch.write().expect("Failed to execute batch");
490493

491494
// Check we have multiple entries
492-
assert!(db.safe_iter().count() > 1);
495+
assert!(db.safe_iter().expect("failed to get iterator").count() > 1);
493496
let _ = db.unsafe_clear();
494-
assert_eq!(db.safe_iter().count(), 0);
497+
assert_eq!(db.safe_iter().expect("failed to get iterator").count(), 0);
495498
// Clear again to ensure safety when clearing empty map
496499
let _ = db.unsafe_clear();
497-
assert_eq!(db.safe_iter().count(), 0);
500+
assert_eq!(db.safe_iter().expect("failed to get iterator").count(), 0);
498501
// Clear with one item
499502
let _ = db.insert(&1, &"e".to_string());
500-
assert_eq!(db.safe_iter().count(), 1);
503+
assert_eq!(db.safe_iter().expect("failed to get iterator").count(), 1);
501504
let _ = db.unsafe_clear();
502-
assert_eq!(db.safe_iter().count(), 0);
505+
assert_eq!(db.safe_iter().expect("failed to get iterator").count(), 0);
503506
}
504507

505508
#[tokio::test]
@@ -554,7 +557,9 @@ async fn test_iter_with_bounds() {
554557
);
555558

556559
// Specify a bound outside of dataset.
557-
let db_iter = db.safe_iter_with_bounds(Some(200), Some(300));
560+
let db_iter = db
561+
.safe_iter_with_bounds(Some(200), Some(300))
562+
.expect("failed to get iterator");
558563
assert!(db_iter.collect::<Vec<_>>().is_empty());
559564

560565
// Skip to first key in the bound (bound is [1, 50))
@@ -631,12 +636,12 @@ async fn test_is_empty() {
631636
insert_batch.write().expect("Failed to execute batch");
632637

633638
// Check we have multiple entries and not empty
634-
assert!(db.safe_iter().count() > 1);
639+
assert!(db.safe_iter().expect("failed to get iterator").count() > 1);
635640
assert!(!db.is_empty());
636641

637642
// Clear again to ensure empty works after clearing
638643
let _ = db.unsafe_clear();
639-
assert_eq!(db.safe_iter().count(), 0);
644+
assert_eq!(db.safe_iter().expect("failed to get iterator").count(), 0);
640645
assert!(db.is_empty());
641646
}
642647

@@ -713,7 +718,10 @@ async fn test_multi_remove() {
713718
// Remove 50 items
714719
db.multi_remove(keys_vals.clone().map(|kv| kv.0).take(50))
715720
.expect("Failed to multi-remove");
716-
assert_eq!(db.safe_iter().count(), 101 - 50);
721+
assert_eq!(
722+
db.safe_iter().expect("failed to get iterator").count(),
723+
101 - 50
724+
);
717725

718726
// Check that the remaining are present
719727
for (k, v) in keys_vals.skip(50) {

crates/typed-store/src/traits.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,20 @@ where
5353
fn is_empty(&self) -> bool;
5454

5555
/// Same as `iter` but performs status check.
56-
fn safe_iter(&'a self) -> Self::SafeIterator;
56+
fn safe_iter(&'a self) -> Result<Self::SafeIterator, Self::Error>;
5757

5858
/// Same as `iter_with_bounds` but performs status check.
5959
fn safe_iter_with_bounds(
6060
&'a self,
6161
lower_bound: Option<K>,
6262
upper_bound: Option<K>,
63-
) -> Self::SafeIterator;
63+
) -> Result<Self::SafeIterator, Self::Error>;
6464

6565
/// Same as `range_iter` but performs status check.
66-
fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator;
66+
fn safe_range_iter(
67+
&'a self,
68+
range: impl RangeBounds<K>,
69+
) -> Result<Self::SafeIterator, Self::Error>;
6770

6871
/// Returns a vector of values corresponding to the keys provided, non-atomically.
6972
fn multi_get<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<Vec<Option<V>>, Self::Error>

crates/walrus-service/src/event/event_processor/processor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl EventProcessor {
210210
pub fn poll(&self, from: u64) -> Result<Vec<PositionedStreamEvent>, TypedStoreError> {
211211
self.stores
212212
.event_store
213-
.safe_iter_with_bounds(Some(from), None)
213+
.safe_iter_with_bounds(Some(from), None)?
214214
.take(MAX_EVENTS_PER_POLL)
215215
.map(|result| result.map(|(_, event)| event))
216216
.collect()
@@ -222,7 +222,7 @@ impl EventProcessor {
222222
let mut iter = self
223223
.stores
224224
.event_store
225-
.safe_iter_with_bounds(Some(from), None);
225+
.safe_iter_with_bounds(Some(from), None)?;
226226
let Some(result) = iter.next() else {
227227
return Ok(None);
228228
};
@@ -330,7 +330,7 @@ impl EventProcessor {
330330
.with_fixint_encoding()
331331
.serialize(&commit_index)?;
332332
self.stores.event_store.rocksdb.delete_file_in_range(
333-
&self.stores.event_store.cf(),
333+
&self.stores.event_store.cf()?,
334334
&start,
335335
&end,
336336
)?;

0 commit comments

Comments
 (0)