Skip to content

Commit 7dc6ffd

Browse files
authored
Merge pull request #786 from subspace/parity-db-migration
Parity db migration
2 parents 037b522 + fa02d9a commit 7dc6ffd

File tree

9 files changed

+429
-265
lines changed

9 files changed

+429
-265
lines changed

Cargo.lock

Lines changed: 4 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@ codegen-units = 1
8686
# TODO: Remove once chacha20poly1305 0.10 appears in libp2p's dependencies
8787
chacha20poly1305 = { git = "https://github.com/RustCrypto/AEADs", rev = "06dbfb5571687fd1bbe9d3c9b2193a1ba17f8e99" }
8888
libp2p = { git = "https://github.com/subspace/rust-libp2p", branch = "subspace-v3" }
89+
# TODO: Remove once parity-db 0.3.17 will be released (see https://github.com/paritytech/parity-db/issues/89)
90+
parity-db = { git = "https://github.com/paritytech/parity-db/", rev = "d324a26daabda11a2307de607d0a614a567b44ba" }

crates/subspace-farmer/src/archiving.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ pub enum ArchivingError {
2020
GetBlockError(u32),
2121
#[error("jsonrpsee error: {0}")]
2222
RpcError(Box<dyn std::error::Error + Send + Sync>),
23-
#[error("Last block retrieval from plot, rocksdb error: {0}")]
24-
LastBlock(rocksdb::Error),
2523
#[error("Error joining task: {0}")]
2624
JoinTask(tokio::task::JoinError),
2725
#[error("Archiver instantiation error: {0}")]

crates/subspace-farmer/src/commitments.rs

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use crate::plot::{PieceOffset, Plot};
77
use arc_swap::ArcSwapOption;
88
use databases::{CommitmentDatabases, CreateDbEntryResult, DbEntry};
99
use event_listener_primitives::{Bag, HandlerId};
10+
use parity_db::Db;
1011
use parking_lot::Mutex;
1112
use rayon::prelude::*;
12-
use rocksdb::{WriteBatch, DB};
1313
use std::path::PathBuf;
1414
use std::sync::atomic::{AtomicBool, Ordering};
1515
use std::sync::Arc;
@@ -28,11 +28,13 @@ const TAGS_WRITE_BATCH_SIZE: usize = 16 * 1024 * 1024 / (TAG_SIZE + PIECE_OFFSET
2828
#[derive(Debug, Error)]
2929
pub enum CommitmentError {
3030
#[error("Metadata DB error: {0}")]
31-
MetadataDb(rocksdb::Error),
31+
MetadataDb(parity_db::Error),
3232
#[error("Commitment DB error: {0}")]
33-
CommitmentDb(rocksdb::Error),
33+
CommitmentDb(parity_db::Error),
3434
#[error("Plot error: {0}")]
3535
Plot(io::Error),
36+
#[error("Migration error: {0}")]
37+
Migrate(io::Error),
3638
}
3739

3840
#[derive(Debug, Copy, Clone)]
@@ -130,9 +132,23 @@ impl Commitments {
130132
self.inner.current.swap(current);
131133
self.inner.next.swap(next);
132134

133-
let db_path = self.inner.base_directory.join(hex::encode(salt));
135+
let options = parity_db::Options {
136+
path: self.inner.base_directory.join(hex::encode(salt)),
137+
columns: vec![parity_db::ColumnOptions {
138+
preimage: false,
139+
btree_index: true,
140+
uniform: false,
141+
ref_counted: false,
142+
compression: parity_db::CompressionType::NoCompression,
143+
compression_threshold: 4096,
144+
}],
145+
sync_wal: true,
146+
sync_data: true,
147+
stats: false,
148+
salt: None,
149+
};
134150
db_entry.lock().replace(Arc::new(
135-
DB::open_default(db_path).map_err(CommitmentError::CommitmentDb)?,
151+
Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?,
136152
));
137153
}
138154

@@ -173,11 +189,12 @@ impl Commitments {
173189
if tags_with_offset.len() == tags_with_offset.capacity() {
174190
tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));
175191

176-
let mut batch = WriteBatch::default();
177-
for (tag, offset) in &tags_with_offset {
178-
batch.put(tag, offset);
179-
}
180-
db.write(batch).map_err(CommitmentError::CommitmentDb)?;
192+
db.commit(
193+
tags_with_offset
194+
.iter()
195+
.map(|(tag, offset)| (0, tag, Some(offset.to_vec()))),
196+
)
197+
.map_err(CommitmentError::CommitmentDb)?;
181198

182199
tags_with_offset.clear();
183200
}
@@ -195,11 +212,12 @@ impl Commitments {
195212
if let Some(db) = db_guard.as_ref() {
196213
tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));
197214

198-
let mut batch = WriteBatch::default();
199-
for (tag, offset) in &tags_with_offset {
200-
batch.put(tag, offset);
201-
}
202-
db.write(batch).map_err(CommitmentError::CommitmentDb)?;
215+
db.commit(
216+
tags_with_offset
217+
.iter()
218+
.map(|(tag, offset)| (0, tag, Some(offset.to_vec()))),
219+
)
220+
.map_err(CommitmentError::CommitmentDb)?;
203221
}
204222
}
205223

@@ -240,12 +258,12 @@ impl Commitments {
240258
let db_guard = db_entry.lock();
241259

242260
if let Some(db) = db_guard.as_ref() {
243-
let mut batch = WriteBatch::default();
244-
for piece in pieces {
245-
let tag = create_tag(piece, salt);
246-
batch.delete(tag);
247-
}
248-
db.write(batch).map_err(CommitmentError::CommitmentDb)?;
261+
db.commit(
262+
pieces
263+
.iter()
264+
.map(|piece| (0, create_tag(piece, salt), None)),
265+
)
266+
.map_err(CommitmentError::CommitmentDb)?;
249267
}
250268
}
251269

@@ -276,11 +294,12 @@ impl Commitments {
276294

277295
tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));
278296

279-
let mut batch = WriteBatch::default();
280-
for (tag, piece_offset) in tags_with_offset {
281-
batch.put(tag, piece_offset.to_le_bytes());
282-
}
283-
db.write(batch).map_err(CommitmentError::CommitmentDb)?;
297+
db.commit(
298+
tags_with_offset
299+
.into_iter()
300+
.map(|(tag, offset)| (0, tag, Some(offset.to_le_bytes().to_vec()))),
301+
)
302+
.map_err(CommitmentError::CommitmentDb)?;
284303
};
285304
}
286305

@@ -315,12 +334,17 @@ impl Commitments {
315334
return Vec::new();
316335
}
317336
};
318-
let iter = db.raw_iterator();
337+
let iter = match db.iter(0) {
338+
Ok(iter) => iter,
339+
Err(_) => {
340+
return Vec::new();
341+
}
342+
};
319343

320344
// Take the best out of 10 solutions
321345
let mut solutions = SolutionIterator::new(iter, target, range)
322-
.take(limit)
323-
.collect::<Vec<_>>();
346+
.map(|iter| iter.take(limit).collect::<Vec<_>>())
347+
.unwrap_or_default();
324348
let target = u64::from_be_bytes(target);
325349
solutions.sort_by_key(|(tag, _)| {
326350
let tag = u64::from_be_bytes(*tag);
@@ -374,7 +398,7 @@ enum SolutionIteratorState {
374398
}
375399

376400
pub(crate) struct SolutionIterator<'a> {
377-
iter: rocksdb::DBRawIterator<'a>,
401+
iter: parity_db::BTreeIterator<'a>,
378402
state: SolutionIteratorState,
379403
/// Lower bound of solution range
380404
lower: u64,
@@ -383,7 +407,11 @@ pub(crate) struct SolutionIterator<'a> {
383407
}
384408

385409
impl<'a> SolutionIterator<'a> {
386-
pub fn new(mut iter: rocksdb::DBRawIterator<'a>, target: Tag, range: u64) -> Self {
410+
pub fn new(
411+
mut iter: parity_db::BTreeIterator<'a>,
412+
target: Tag,
413+
range: u64,
414+
) -> parity_db::Result<Self> {
387415
let (lower, is_lower_overflowed) = u64::from_be_bytes(target).overflowing_sub(range / 2);
388416
let (upper, is_upper_overflowed) = u64::from_be_bytes(target).overflowing_add(range / 2);
389417

@@ -394,29 +422,27 @@ impl<'a> SolutionIterator<'a> {
394422
);
395423

396424
let state = if is_lower_overflowed || is_upper_overflowed {
397-
iter.seek_to_first();
425+
iter.seek_to_first()?;
398426
SolutionIteratorState::OverflowStart
399427
} else {
400-
iter.seek(lower.to_be_bytes());
428+
iter.seek(&lower.to_be_bytes())?;
401429
SolutionIteratorState::NoOverflow
402430
};
403-
Self {
431+
Ok(Self {
404432
iter,
405433
state,
406434
lower,
407435
upper,
408-
}
436+
})
409437
}
410438

411439
fn next_entry(&mut self) -> Option<(Tag, PieceOffset)> {
412-
self.iter
413-
.key()
414-
.map(|tag| tag.try_into().unwrap())
415-
.map(|tag| {
416-
let offset = u64::from_le_bytes(self.iter.value().unwrap().try_into().unwrap());
417-
self.iter.next();
418-
(tag, offset)
419-
})
440+
self.iter.next().ok().flatten().map(|(tag, offset)| {
441+
(
442+
tag.try_into().unwrap(),
443+
u64::from_le_bytes(offset.try_into().unwrap()),
444+
)
445+
})
420446
}
421447
}
422448

@@ -433,7 +459,7 @@ impl<'a> Iterator for SolutionIterator<'a> {
433459
.filter(|(tag, _)| u64::from_be_bytes(*tag) <= self.upper)
434460
.or_else(|| {
435461
self.state = SolutionIteratorState::OverflowEnd;
436-
self.iter.seek(self.lower.to_be_bytes());
462+
self.iter.seek(&self.lower.to_be_bytes()).ok()?;
437463
self.next()
438464
}),
439465
SolutionIteratorState::OverflowEnd => self.next_entry(),

crates/subspace-farmer/src/commitments/databases.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::commitments::metadata::{CommitmentMetadata, CommitmentStatus};
22
use crate::commitments::CommitmentError;
33
use lru::LruCache;
4+
use parity_db::{ColumnOptions, CompressionType, Db, Options};
45
use parking_lot::Mutex;
5-
use rocksdb::{Options, DB};
66
use std::fmt;
77
use std::ops::Deref;
88
use std::path::PathBuf;
@@ -20,7 +20,7 @@ pub(super) struct CreateDbEntryResult {
2020

2121
pub(super) struct DbEntry {
2222
salt: Salt,
23-
db: Mutex<Option<Arc<DB>>>,
23+
db: Mutex<Option<Arc<Db>>>,
2424
}
2525

2626
impl fmt::Debug for DbEntry {
@@ -30,7 +30,7 @@ impl fmt::Debug for DbEntry {
3030
}
3131

3232
impl Deref for DbEntry {
33-
type Target = Mutex<Option<Arc<DB>>>;
33+
type Target = Mutex<Option<Arc<Db>>>;
3434

3535
fn deref(&self) -> &Self::Target {
3636
&self.db
@@ -52,6 +52,11 @@ pub(super) struct CommitmentDatabases {
5252

5353
impl CommitmentDatabases {
5454
pub(super) fn new(base_directory: PathBuf) -> Result<Self, CommitmentError> {
55+
if rocksdb::DB::open_default(base_directory.join("metadata")).is_ok() {
56+
std::fs::remove_dir_all(&base_directory).map_err(CommitmentError::Migrate)?;
57+
std::fs::create_dir(&base_directory).map_err(CommitmentError::Migrate)?;
58+
}
59+
5560
let mut metadata = CommitmentMetadata::new(base_directory.join("metadata"))?;
5661
let mut databases = LruCache::new(COMMITMENTS_CACHE_SIZE);
5762

@@ -74,8 +79,22 @@ impl CommitmentDatabases {
7479

7580
// Open databases that were fully created during previous run
7681
for salt in metadata.keys() {
77-
let db = DB::open(&Options::default(), base_directory.join(hex::encode(salt)))
78-
.map_err(CommitmentError::CommitmentDb)?;
82+
let options = Options {
83+
path: base_directory.join(hex::encode(salt)),
84+
columns: vec![ColumnOptions {
85+
preimage: false,
86+
btree_index: true,
87+
uniform: false,
88+
ref_counted: false,
89+
compression: CompressionType::NoCompression,
90+
compression_threshold: 4096,
91+
}],
92+
sync_wal: true,
93+
sync_data: true,
94+
stats: false,
95+
salt: None,
96+
};
97+
let db = Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?;
7998
databases.put(
8099
*salt,
81100
Arc::new(DbEntry {

0 commit comments

Comments
 (0)