Skip to content

Commit f49009f

Browse files
authored
Recommitments refactoring (#589)
* Simplify iteration over commitments databases while avoiding some locks * Simplify commitments metadata mutation under new data structure and move it into a separate module * Rename `commitment_databases` module to `databases` * Minor simplification
2 parents fd89651 + 2876ce3 commit f49009f

File tree

3 files changed

+119
-131
lines changed

3 files changed

+119
-131
lines changed

crates/subspace-farmer/src/commitments.rs

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
mod commitment_databases;
1+
mod databases;
2+
mod metadata;
23
#[cfg(test)]
34
mod tests;
45

56
use crate::plot::{PieceOffset, Plot};
67
use arc_swap::ArcSwapOption;
7-
use commitment_databases::{CommitmentDatabases, CreateDbEntryResult, DbEntry};
8+
use databases::{CommitmentDatabases, CreateDbEntryResult, DbEntry};
89
use event_listener_primitives::{Bag, HandlerId};
910
use parking_lot::Mutex;
1011
use rayon::prelude::*;
@@ -179,22 +180,8 @@ impl Commitments {
179180
}
180181

181182
pub(crate) fn remove_pieces(&self, pieces: &[Piece]) -> Result<(), CommitmentError> {
182-
let salts = self.inner.commitment_databases.lock().get_salts();
183-
184-
for salt in salts {
185-
let db_entry = match self
186-
.inner
187-
.commitment_databases
188-
.lock()
189-
.get_db_entry(&salt)
190-
.cloned()
191-
{
192-
Some(db_entry) => db_entry,
193-
None => {
194-
continue;
195-
}
196-
};
197-
183+
for db_entry in self.get_db_entries() {
184+
let salt = db_entry.salt();
198185
let db_guard = db_entry.lock();
199186

200187
if let Some(db) = db_guard.as_ref() {
@@ -217,22 +204,8 @@ impl Commitments {
217204
F: Fn() -> Iter,
218205
Iter: Iterator<Item = (PieceOffset, &'iter [u8])>,
219206
{
220-
let salts = self.inner.commitment_databases.lock().get_salts();
221-
222-
for salt in salts {
223-
let db_entry = match self
224-
.inner
225-
.commitment_databases
226-
.lock()
227-
.get_db_entry(&salt)
228-
.cloned()
229-
{
230-
Some(db_entry) => db_entry,
231-
None => {
232-
continue;
233-
}
234-
};
235-
207+
for db_entry in self.get_db_entries() {
208+
let salt = db_entry.salt();
236209
let db_guard = db_entry.lock();
237210

238211
if let Some(db) = db_guard.as_ref() {
@@ -257,7 +230,7 @@ impl Commitments {
257230
range: u64,
258231
salt: Salt,
259232
) -> Option<(Tag, PieceOffset)> {
260-
let db_entry = self.get_local_db_entry(&salt)?;
233+
let db_entry = self.get_db_entry(salt)?;
261234

262235
let db_guard = db_entry.try_lock()?;
263236
let db = db_guard.clone()?;
@@ -282,7 +255,7 @@ impl Commitments {
282255
self.inner.handlers.status_change.add(callback)
283256
}
284257

285-
fn get_local_db_entry(&self, salt: &Salt) -> Option<Arc<DbEntry>> {
258+
fn get_db_entry(&self, salt: Salt) -> Option<Arc<DbEntry>> {
286259
if let Some(current) = self.inner.current.load_full() {
287260
if current.salt() == salt {
288261
return Some(current);
@@ -297,6 +270,14 @@ impl Commitments {
297270

298271
None
299272
}
273+
274+
fn get_db_entries(&self) -> impl Iterator<Item = Arc<DbEntry>> {
275+
self.inner
276+
.current
277+
.load_full()
278+
.into_iter()
279+
.chain(self.inner.next.load_full())
280+
}
300281
}
301282

302283
enum SolutionIteratorState {
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use super::CommitmentError;
1+
use crate::commitments::metadata::{CommitmentMetadata, CommitmentStatus};
2+
use crate::commitments::CommitmentError;
23
use lru::LruCache;
34
use parking_lot::Mutex;
45
use rocksdb::{Options, DB};
5-
use serde::{Deserialize, Serialize};
6-
use std::collections::HashMap;
76
use std::fmt;
87
use std::ops::Deref;
98
use std::path::PathBuf;
@@ -13,15 +12,6 @@ use tracing::error;
1312

1413
// Cache size is just enough for last 2 salts to be stored
1514
const COMMITMENTS_CACHE_SIZE: usize = 2;
16-
const COMMITMENTS_KEY: &[u8] = b"commitments";
17-
18-
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
19-
enum CommitmentStatus {
20-
/// In-progress commitment to the part of the plot
21-
InProgress,
22-
/// Commitment to the whole plot and not some in-progress partial commitment
23-
Created,
24-
}
2515

2616
pub(super) struct CreateDbEntryResult {
2717
pub(super) db_entry: Arc<DbEntry>,
@@ -48,45 +38,25 @@ impl Deref for DbEntry {
4838
}
4939

5040
impl DbEntry {
51-
pub(super) fn salt(&self) -> &Salt {
52-
&self.salt
41+
pub(super) fn salt(&self) -> Salt {
42+
self.salt
5343
}
5444
}
5545

5646
#[derive(Debug)]
5747
pub(super) struct CommitmentDatabases {
5848
base_directory: PathBuf,
5949
databases: LruCache<Salt, Arc<DbEntry>>,
60-
metadata_cache: HashMap<Salt, CommitmentStatus>,
61-
metadata_db: Arc<DB>,
50+
metadata: Mutex<CommitmentMetadata>,
6251
}
6352

6453
impl CommitmentDatabases {
6554
pub(super) fn new(base_directory: PathBuf) -> Result<Self, CommitmentError> {
66-
let metadata_db = DB::open_default(base_directory.join("metadata"))
67-
.map_err(CommitmentError::MetadataDb)?;
68-
let metadata_cache: HashMap<Salt, CommitmentStatus> = metadata_db
69-
.get(COMMITMENTS_KEY)
70-
.map_err(CommitmentError::MetadataDb)?
71-
.map(|bytes| {
72-
serde_json::from_slice::<HashMap<String, CommitmentStatus>>(&bytes)
73-
.unwrap()
74-
.into_iter()
75-
.map(|(salt, status)| (hex::decode(salt).unwrap().try_into().unwrap(), status))
76-
.collect()
77-
})
78-
.unwrap_or_default();
79-
80-
let mut commitment_databases = CommitmentDatabases {
81-
base_directory: base_directory.clone(),
82-
databases: LruCache::new(COMMITMENTS_CACHE_SIZE),
83-
metadata_cache,
84-
metadata_db: Arc::new(metadata_db),
85-
};
55+
let mut metadata = CommitmentMetadata::new(base_directory.join("metadata"))?;
56+
let mut databases = LruCache::new(COMMITMENTS_CACHE_SIZE);
8657

87-
if commitment_databases
88-
.metadata_cache
89-
.drain_filter(|salt, status| match status {
58+
metadata.mutate(|metadata| {
59+
metadata.drain_filter(|salt, status| match status {
9060
CommitmentStatus::InProgress => {
9161
if let Err(error) =
9262
std::fs::remove_dir_all(base_directory.join(hex::encode(salt)))
@@ -100,35 +70,29 @@ impl CommitmentDatabases {
10070
true
10171
}
10272
CommitmentStatus::Created => false,
103-
})
104-
.next()
105-
.is_some()
106-
{
107-
commitment_databases.persist_metadata_cache()?;
108-
}
109-
110-
// Open databases that were fully created during previous run
111-
for salt in commitment_databases.metadata_cache.keys() {
112-
let db = DB::open(&Options::default(), base_directory.join(hex::encode(salt)))
113-
.map_err(CommitmentError::CommitmentDb)?;
114-
commitment_databases.databases.put(
115-
*salt,
116-
Arc::new(DbEntry {
117-
salt: *salt,
118-
db: Mutex::new(Some(Arc::new(db))),
119-
}),
120-
);
121-
}
122-
123-
Ok::<_, CommitmentError>(commitment_databases)
124-
}
73+
});
12574

126-
/// Get salts for all current database entries
127-
pub(super) fn get_salts(&self) -> Vec<Salt> {
128-
self.databases
129-
.iter()
130-
.map(|(salt, _db_entry)| *salt)
131-
.collect()
75+
// Open databases that were fully created during previous run
76+
for salt in metadata.keys() {
77+
let db = DB::open(&Options::default(), base_directory.join(hex::encode(salt)))
78+
.map_err(CommitmentError::CommitmentDb)?;
79+
databases.put(
80+
*salt,
81+
Arc::new(DbEntry {
82+
salt: *salt,
83+
db: Mutex::new(Some(Arc::new(db))),
84+
}),
85+
);
86+
}
87+
88+
Ok(())
89+
})?;
90+
91+
Ok(CommitmentDatabases {
92+
base_directory: base_directory.clone(),
93+
databases,
94+
metadata: Mutex::new(metadata),
95+
})
13296
}
13397

13498
/// Returns current and next `db_entry`.
@@ -176,7 +140,11 @@ impl CommitmentDatabases {
176140
let old_db_path = self.base_directory.join(hex::encode(old_salt));
177141

178142
// Remove old commitments for `old_salt`
179-
self.metadata_cache.remove(&old_salt);
143+
self.metadata.lock().mutate(|metadata| {
144+
metadata.remove(&old_salt);
145+
146+
Ok(())
147+
})?;
180148

181149
tokio::task::spawn_blocking(move || {
182150
// Take a lock to make sure database was released by whatever user there was and we
@@ -202,35 +170,16 @@ impl CommitmentDatabases {
202170
}
203171

204172
pub(super) fn mark_in_progress(&mut self, salt: Salt) -> Result<(), CommitmentError> {
205-
self.update_status(salt, CommitmentStatus::InProgress)
173+
self.metadata.lock().mutate(|metadata| {
174+
metadata.insert(salt, CommitmentStatus::InProgress);
175+
Ok(())
176+
})
206177
}
207178

208179
pub(super) fn mark_created(&mut self, salt: Salt) -> Result<(), CommitmentError> {
209-
self.update_status(salt, CommitmentStatus::Created)
210-
}
211-
212-
fn update_status(
213-
&mut self,
214-
salt: Salt,
215-
status: CommitmentStatus,
216-
) -> Result<(), CommitmentError> {
217-
self.metadata_cache.insert(salt, status);
218-
219-
self.persist_metadata_cache()
220-
}
221-
222-
fn persist_metadata_cache(&self) -> Result<(), CommitmentError> {
223-
let prepared_metadata_cache: HashMap<String, CommitmentStatus> = self
224-
.metadata_cache
225-
.iter()
226-
.map(|(salt, status)| (hex::encode(salt), *status))
227-
.collect();
228-
229-
self.metadata_db
230-
.put(
231-
COMMITMENTS_KEY,
232-
&serde_json::to_vec(&prepared_metadata_cache).unwrap(),
233-
)
234-
.map_err(CommitmentError::MetadataDb)
180+
self.metadata.lock().mutate(|metadata| {
181+
metadata.insert(salt, CommitmentStatus::Created);
182+
Ok(())
183+
})
235184
}
236185
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use crate::commitments::CommitmentError;
2+
use rocksdb::DB;
3+
use serde::{Deserialize, Serialize};
4+
use std::collections::HashMap;
5+
use std::path::PathBuf;
6+
use subspace_core_primitives::Salt;
7+
8+
const COMMITMENTS_KEY: &[u8] = b"commitments";
9+
10+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
11+
pub(super) enum CommitmentStatus {
12+
/// In-progress commitment to the part of the plot
13+
InProgress,
14+
/// Commitment to the whole plot and not some in-progress partial commitment
15+
Created,
16+
}
17+
18+
#[derive(Debug)]
19+
pub(super) struct CommitmentMetadata {
20+
db: DB,
21+
}
22+
23+
impl CommitmentMetadata {
24+
pub(super) fn new(path: PathBuf) -> Result<Self, CommitmentError> {
25+
let db = DB::open_default(path).map_err(CommitmentError::MetadataDb)?;
26+
27+
Ok(Self { db })
28+
}
29+
30+
pub(super) fn mutate<F>(&mut self, mut callback: F) -> Result<(), CommitmentError>
31+
where
32+
F: FnMut(&mut HashMap<Salt, CommitmentStatus>) -> Result<(), CommitmentError>,
33+
{
34+
let mut metadata = self
35+
.db
36+
.get(COMMITMENTS_KEY)
37+
.map_err(CommitmentError::MetadataDb)?
38+
.map(|bytes| {
39+
serde_json::from_slice::<HashMap<String, CommitmentStatus>>(&bytes)
40+
.unwrap()
41+
.into_iter()
42+
.map(|(salt, status)| (hex::decode(salt).unwrap().try_into().unwrap(), status))
43+
.collect()
44+
})
45+
.unwrap_or_default();
46+
47+
callback(&mut metadata)?;
48+
49+
let metadata: HashMap<String, CommitmentStatus> = metadata
50+
.iter()
51+
.map(|(salt, status)| (hex::encode(salt), *status))
52+
.collect();
53+
54+
self.db
55+
.put(COMMITMENTS_KEY, &serde_json::to_vec(&metadata).unwrap())
56+
.map_err(CommitmentError::MetadataDb)
57+
}
58+
}

0 commit comments

Comments
 (0)