Skip to content

Commit 72fd3c9

Browse files
authored
refactor(object store): make object store a separate crate (#2939)
* refactor(object store): make object store common * create a new crate for object store
1 parent 5d097a7 commit 72fd3c9

30 files changed

+224
-121
lines changed

Cargo.lock

+27-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ members = [
1212
"src/frontend",
1313
"src/frontend/test_runner",
1414
"src/meta",
15+
"src/object_store",
1516
"src/prost",
1617
"src/prost/helpers",
1718
"src/risedevtool",

src/bench/ss_bench/main.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ mod utils;
2020
use clap::Parser;
2121
use operations::*;
2222
use risingwave_common::config::StorageConfig;
23+
use risingwave_common::monitor::Print;
2324
use risingwave_meta::hummock::test_utils::setup_compute_env;
2425
use risingwave_meta::hummock::MockHummockMetaClient;
2526
use risingwave_storage::hummock::compaction_executor::CompactionExecutor;
2627
use risingwave_storage::hummock::compactor::{get_remote_sstable_id_generator, CompactorContext};
27-
use risingwave_storage::monitor::{ObjectStoreMetrics, Print, StateStoreMetrics};
28+
use risingwave_storage::monitor::{ObjectStoreMetrics, StateStoreMetrics};
2829
use risingwave_storage::{dispatch_state_store, StateStoreImpl};
2930

3031
#[derive(Parser, Debug)]

src/bench/ss_bench/utils/display_stats.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use risingwave_storage::monitor::{MyHistogram, StateStoreMetrics};
15+
use risingwave_common::monitor::my_stats::MyHistogram;
16+
use risingwave_storage::monitor::StateStoreMetrics;
1617

1718
use super::my_stats::MyStateStoreStats;
1819

src/bench/ss_bench/utils/my_stats.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
// limitations under the License.
1414

1515
use prometheus::core::Metric;
16-
use risingwave_storage::monitor::{MyHistogram, StateStoreMetrics};
16+
use risingwave_common::monitor::my_stats::MyHistogram;
17+
use risingwave_storage::monitor::StateStoreMetrics;
1718

1819
#[derive(Clone, Default)]
1920
pub(crate) struct MyStateStoreStats {

src/common/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ risingwave_pb = { path = "../prost" }
3131
rust_decimal = "1"
3232
serde = { version = "1", features = ["derive"] }
3333
smallvec = "1"
34+
spin = "0.9"
3435
thiserror = "1"
3536
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
3637
tokio-stream = "0.1"

src/storage/src/hummock/cache.rs src/common/src/cache.rs

+23-21
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@
2020
//! `LruCache` implementation port from github.com/facebook/rocksdb. The class `LruCache` is
2121
//! thread-safe, because every operation on cache will be protected by a spin lock.
2222
use std::collections::HashMap;
23+
use std::error::Error;
2324
use std::future::Future;
2425
use std::hash::Hash;
2526
use std::ptr::null_mut;
2627
use std::sync::atomic::{AtomicUsize, Ordering};
2728
use std::sync::Arc;
2829

29-
use futures::channel::oneshot::{channel, Receiver, Sender};
30+
use futures::channel::oneshot::{channel, Canceled, Receiver, Sender};
3031
use spin::Mutex;
3132

32-
use crate::hummock::{HummockError, HummockResult};
33-
3433
const IN_CACHE: u8 = 1;
3534
const REVERSE_IN_CACHE: u8 = !IN_CACHE;
3635

@@ -520,20 +519,22 @@ impl<K: LruKey, T: LruValue> LruCacheShard<K, T> {
520519
}
521520

522521
// Clears the content of the cache.
523-
// This method only works if no cache entries are referenced outside.
524-
fn clear(&mut self) {
522+
// This method is safe to use only if no cache entries are referenced outside.
523+
unsafe fn clear(&mut self) {
525524
while !std::ptr::eq(self.lru.next, self.lru.as_mut()) {
526525
let handle = self.lru.next;
527-
unsafe {
528-
self.erase((*handle).hash, (*handle).get_key());
529-
}
526+
self.erase((*handle).hash, (*handle).get_key());
530527
}
531528
}
532529
}
533530

534531
impl<K: LruKey, T: LruValue> Drop for LruCacheShard<K, T> {
535532
fn drop(&mut self) {
536-
self.clear();
533+
// Since the shard is being drop, there must be no cache entries referenced outside. So we
534+
// are safe to call clear.
535+
unsafe {
536+
self.clear();
537+
}
537538
}
538539
}
539540

@@ -674,8 +675,10 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
674675
hash as usize % self.shards.len()
675676
}
676677

677-
#[cfg(test)]
678-
pub fn clear(&self) {
678+
/// # Safety
679+
///
680+
/// This method can only be called when no cache entry are referenced outside.
681+
pub unsafe fn clear(&self) {
679682
for shard in &self.shards {
680683
let mut shard = shard.lock();
681684
shard.clear();
@@ -684,30 +687,31 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
684687
}
685688

686689
impl<K: LruKey + Clone, T: LruValue> LruCache<K, T> {
687-
pub async fn lookup_with_request_dedup<F, VC>(
690+
pub async fn lookup_with_request_dedup<F, E, VC>(
688691
self: &Arc<Self>,
689692
hash: u64,
690693
key: K,
691694
fetch_value: F,
692-
) -> HummockResult<CachableEntry<K, T>>
695+
) -> Result<Result<CachableEntry<K, T>, E>, Canceled>
693696
where
694697
F: FnOnce() -> VC,
695-
VC: Future<Output = HummockResult<(T, usize)>>,
698+
E: Error,
699+
VC: Future<Output = Result<(T, usize), E>>,
696700
{
697701
match self.lookup_for_request(hash, key.clone()) {
698-
LookupResult::Cached(entry) => Ok(entry),
702+
LookupResult::Cached(entry) => Ok(Ok(entry)),
699703
LookupResult::WaitPendingRequest(recv) => {
700-
let entry = recv.await.map_err(HummockError::other)?;
701-
Ok(entry)
704+
let entry = recv.await?;
705+
Ok(Ok(entry))
702706
}
703707
LookupResult::Miss => match fetch_value().await {
704708
Ok((value, charge)) => {
705709
let entry = self.insert(key, hash, charge, value);
706-
Ok(entry)
710+
Ok(Ok(entry))
707711
}
708712
Err(e) => {
709713
self.clear_pending_request(&key, hash);
710-
Err(e)
714+
Ok(Err(e))
711715
}
712716
},
713717
}
@@ -753,8 +757,6 @@ mod tests {
753757
use rand::{RngCore, SeedableRng};
754758

755759
use super::*;
756-
use crate::hummock::cache::LruHandle;
757-
use crate::hummock::LruCache;
758760

759761
pub struct Block {
760762
pub offset: u64,

src/common/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ pub mod array;
4343
#[macro_use]
4444
pub mod util;
4545
pub mod buffer;
46+
pub mod cache;
4647
pub mod catalog;
4748
pub mod collection;
4849
pub mod config;
4950
pub mod hash;
51+
pub mod monitor;
5052
pub mod service;
5153
#[cfg(test)]
5254
pub mod test_utils;

src/common/src/monitor/mod.rs

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2022 Singularity Data
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod my_stats;
16+
17+
use prometheus::core::{AtomicU64, Collector, GenericCounter, GenericCounterVec, Metric};
18+
use prometheus::{Histogram, HistogramVec};
19+
20+
use crate::monitor::my_stats::MyHistogram;
21+
22+
/// Define extension method `print` used in `print_statistics`.
23+
pub trait Print {
24+
fn print(&self);
25+
}
26+
27+
impl Print for GenericCounter<AtomicU64> {
28+
fn print(&self) {
29+
let desc = &self.desc()[0].fq_name;
30+
let counter = self.metric().get_counter().get_value() as u64;
31+
println!("{desc} COUNT : {counter}");
32+
}
33+
}
34+
35+
impl Print for Histogram {
36+
fn print(&self) {
37+
let desc = &self.desc()[0].fq_name;
38+
39+
let histogram = MyHistogram::from_prom_hist(self.metric().get_histogram());
40+
let p50 = histogram.get_percentile(50.0);
41+
let p95 = histogram.get_percentile(95.0);
42+
let p99 = histogram.get_percentile(99.0);
43+
let p100 = histogram.get_percentile(100.0);
44+
45+
let sample_count = self.get_sample_count();
46+
let sample_sum = self.get_sample_sum();
47+
println!("{desc} P50 : {p50} P95 : {p95} P99 : {p99} P100 : {p100} COUNT : {sample_count} SUM : {sample_sum}");
48+
}
49+
}
50+
51+
impl Print for HistogramVec {
52+
fn print(&self) {
53+
let desc = &self.desc()[0].fq_name;
54+
println!("{desc} {:?}", self);
55+
}
56+
}
57+
58+
impl Print for GenericCounterVec<AtomicU64> {
59+
fn print(&self) {
60+
let desc = &self.desc()[0].fq_name;
61+
println!("{desc} {:?}", self);
62+
}
63+
}
File renamed without changes.

src/object_store/Cargo.toml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "risingwave_object_store"
3+
version = "0.1.7"
4+
edition = "2021"
5+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
6+
7+
[dependencies]
8+
async-trait = "0.1"
9+
aws-config = { version = "0.12", default-features = false, features = ["rt-tokio", "native-tls"] }
10+
aws-endpoint = { version = "0.12", default-features = false }
11+
aws-sdk-s3 = { version = "0.12", default-features = false, features = ["rt-tokio", "native-tls"] }
12+
aws-smithy-http = "0.42"
13+
aws-smithy-types = "0.42"
14+
aws-types = { version = "0.12", features = ["hardcoded-credentials"] }
15+
bytes = { version = "1", features = ["serde"] }
16+
fail = "0.5"
17+
futures = { version = "0.3", default-features = false, features = ["alloc"] }
18+
itertools = "0.10"
19+
prometheus = { version = "0.13", features = ["process"] }
20+
risingwave_common = { path = "../common" }
21+
spin = "0.9"
22+
tempfile = "3"
23+
thiserror = "1"
24+
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = [
25+
"fs",
26+
] }
27+
tracing = { version = "0.1" }

src/object_store/src/lib.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2022 Singularity Data
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#![feature(backtrace)]
16+
17+
pub mod object;

src/storage/src/object/disk.rs src/object_store/src/object/disk.rs

+9-12
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ use std::sync::Arc;
2222

2323
use bytes::Bytes;
2424
use futures::future::try_join_all;
25+
use risingwave_common::cache::{CachableEntry, LruCache};
2526
use tokio::io::AsyncWriteExt;
2627

27-
use crate::hummock::{CachableEntry, HummockError, LruCache};
2828
use crate::object::{
2929
strip_path_local, BlockLocation, ObjectError, ObjectMetadata, ObjectResult, ObjectStore,
3030
};
@@ -131,24 +131,21 @@ impl LocalDiskObjectStore {
131131
};
132132
let entry = self
133133
.opened_read_file_cache
134-
.lookup_with_request_dedup(hash, path.clone(), || async {
134+
.lookup_with_request_dedup::<_, ObjectError, _>(hash, path.clone(), || async {
135135
let file = utils::open_file(&path, true, false, false)
136-
.await
137-
.map_err(HummockError::object_io_error)?
136+
.await?
138137
.into_std()
139138
.await;
140139
Ok((file, 1))
141140
})
142141
.await
143142
.map_err(|e| {
144-
ObjectError::disk(
145-
"".to_string(),
146-
std::io::Error::new(
147-
ErrorKind::Other,
148-
format!("Failed to open file {:?}. Err{:?}", path.to_str(), e),
149-
),
150-
)
151-
})?;
143+
ObjectError::internal(format!(
144+
"open file cache request dedup get canceled {:?}. Err{:?}",
145+
path.to_str(),
146+
e
147+
))
148+
})??;
152149
Ok(Arc::new(entry))
153150
}
154151
}
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)