Skip to content

Commit aea2cd9

Browse files
authored
Merge pull request #977 from subspace/fix-block-production
Fix block production
2 parents 16b8a79 + d71b5b6 commit aea2cd9

File tree

7 files changed

+98
-45
lines changed

7 files changed

+98
-45
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/subspace-core-primitives/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ parity-scale-codec = { version = "3.2.1", default-features = false, features = [
3131
rand = { version = "0.8.5", features = ["min_const_gen"], optional = true }
3232
rand_chacha = { version = "0.3.1", default-features = false }
3333
rand_core = { version = "0.6.4", default-features = false, features = ["alloc"] }
34+
rayon = { version = "1.6.0", optional = true }
3435
scale-info = { version = "2.3.0", default-features = false, features = ["derive"] }
3536
serde = { version = "1.0.147", optional = true, features = ["derive"] }
3637
serde_arrays = "0.1.0"
@@ -41,7 +42,13 @@ uint = { version = "0.9.4", default-features = false }
4142
criterion = "0.4.0"
4243

4344
[features]
44-
default = ["std"]
45+
default = [
46+
"parallel-decoding",
47+
"std",
48+
]
49+
# Parallel decoding will use all CPUs available, but will allocate a memory of a size of a sector instead of square root
50+
# of that
51+
parallel-decoding = ["rayon"]
4552
std = [
4653
"ark-bls12-381/std",
4754
"ark-ff/std",

crates/subspace-core-primitives/src/sector_codec.rs

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ mod tests;
66
use crate::Scalar;
77
use alloc::vec::Vec;
88
use ark_bls12_381::Fr;
9-
use ark_poly::univariate::DensePolynomial;
10-
use ark_poly::{EvaluationDomain, GeneralEvaluationDomain, Polynomial, UVPolynomial};
11-
use core::mem;
9+
use ark_poly::{EvaluationDomain, GeneralEvaluationDomain};
1210
use num_integer::Roots;
11+
#[cfg(feature = "parallel-decoding")]
12+
use rayon::prelude::*;
1313

1414
#[derive(Debug, Copy, Clone)]
1515
#[cfg_attr(feature = "thiserror", derive(thiserror::Error))]
@@ -127,34 +127,66 @@ impl SectorCodec {
127127
return Err(SectorCodecError::FailedToInstantiateDomain);
128128
};
129129

130-
let mut row = Vec::with_capacity(self.sector_side_size_in_scalars);
130+
#[cfg(not(feature = "parallel-decoding"))]
131+
{
132+
let mut row = Vec::with_capacity(self.sector_side_size_in_scalars);
133+
134+
for row_index in 0..self.sector_side_size_in_scalars {
135+
row.extend(
136+
sector
137+
.iter()
138+
.skip(row_index)
139+
.step_by(self.sector_side_size_in_scalars)
140+
.map(|scalar| scalar.0),
141+
);
142+
143+
domain.coset_ifft_in_place(&mut row);
144+
domain.fft_in_place(&mut row);
131145

132-
for row_index in 0..self.sector_side_size_in_scalars {
133-
row.extend(
134146
sector
135-
.iter()
147+
.iter_mut()
136148
.skip(row_index)
137149
.step_by(self.sector_side_size_in_scalars)
138-
.map(|scalar| scalar.0),
139-
);
140-
141-
domain.coset_ifft_in_place(&mut row);
142-
143-
let polynomial = DensePolynomial::from_coefficients_vec(mem::take(&mut row));
150+
.zip(row.iter())
151+
.for_each(|(output, input)| *output = Scalar(*input));
144152

145-
sector
146-
.iter_mut()
147-
.skip(row_index)
148-
.step_by(self.sector_side_size_in_scalars)
149-
.zip(domain.elements())
150-
.for_each(|(output, column)| {
151-
*output = Scalar(polynomial.evaluate(&column));
152-
});
153+
// Clear for next iteration of the loop
154+
row.clear();
155+
}
156+
}
157+
#[cfg(feature = "parallel-decoding")]
158+
{
159+
// Transform sector grid from columns to rows
160+
let mut rows = vec![
161+
vec![Fr::default(); self.sector_side_size_in_scalars];
162+
self.sector_side_size_in_scalars
163+
];
164+
for (row_index, row) in rows.iter_mut().enumerate() {
165+
sector
166+
.iter()
167+
.skip(row_index)
168+
.step_by(self.sector_side_size_in_scalars)
169+
.zip(row)
170+
.for_each(|(input, output)| *output = input.0);
171+
}
172+
173+
// Decode rows in parallel
174+
rows.par_iter_mut().for_each(|row| {
175+
domain.coset_ifft_in_place(row);
176+
domain.fft_in_place(row);
177+
});
153178

154-
// Reuse allocation and clear for next iteration of the loop
155-
row = polynomial.coeffs;
156-
row.clear();
179+
// Store result back into inout sector
180+
for (row_index, row) in rows.into_iter().enumerate() {
181+
sector
182+
.iter_mut()
183+
.skip(row_index)
184+
.step_by(self.sector_side_size_in_scalars)
185+
.zip(row)
186+
.for_each(|(output, input)| *output = Scalar(input));
187+
}
157188
}
189+
158190
Ok(())
159191
}
160192
}

crates/subspace-farmer-components/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ tracing = "0.1.37"
3434
criterion = "0.4.0"
3535
futures = "0.3.25"
3636
memmap2 = "0.5.8"
37-
rayon = "1.5.3"
37+
rayon = "1.6.0"
3838
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
3939

4040
[[bench]]

crates/subspace-farmer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,4 @@ zeroize = "1.5.7"
5858
jemallocator = "0.5.0"
5959

6060
[dev-dependencies]
61-
rayon = "1.5.3"
61+
rayon = "1.6.0"

crates/subspace-farmer/src/single_disk_plot.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
4646
use thiserror::Error;
4747
use tokio::runtime::Handle;
4848
use tokio::sync::broadcast;
49-
use tracing::{debug, error, info, info_span, trace, Instrument, Span};
49+
use tracing::{debug, error, info, info_span, trace, warn, Instrument, Span};
5050
use ulid::Ulid;
5151

5252
// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
@@ -715,23 +715,29 @@ impl SingleDiskPlot {
715715
Err(error) => Err(PlottingError::LowLevel(error))?,
716716
};
717717

718-
let mut metadata_header = metadata_header.lock();
719-
metadata_header.sector_count += 1;
720-
metadata_header_mmap
721-
.copy_from_slice(metadata_header.encode().as_slice());
718+
{
719+
let mut metadata_header = metadata_header.lock();
720+
metadata_header.sector_count += 1;
721+
metadata_header_mmap
722+
.copy_from_slice(metadata_header.encode().as_slice());
723+
}
722724

723725
handlers.sector_plotted.call_simple(&plotted_sector);
724726

725727
// TODO: Migrate this over to using `on_sector_plotted` instead
726728
// Publish pieces-by-sector if we use DSN
727-
let publishing_result = handle.block_on(
728-
piece_publisher.publish_pieces(plotted_sector.piece_indexes),
729-
);
730-
731-
// cancelled
732-
if publishing_result.is_err() {
733-
return;
734-
}
729+
tokio::spawn({
730+
let piece_publisher = piece_publisher.clone();
731+
732+
async move {
733+
if let Err(error) = piece_publisher
734+
.publish_pieces(plotted_sector.piece_indexes)
735+
.await
736+
{
737+
warn!(%sector_index, %error, "Failed to publish pieces to DSN");
738+
}
739+
}
740+
});
735741
}
736742
};
737743

@@ -915,6 +921,13 @@ impl SingleDiskPlot {
915921
if solutions.len() >= SOLUTIONS_LIMIT {
916922
break;
917923
}
924+
// TODO: It is known that decoding is slow now and we'll only be
925+
// able to decode a single sector within time slot reliably, in the
926+
// future we may want allow more than one sector to be valid within
927+
// the same disk plot.
928+
if !solutions.is_empty() {
929+
break;
930+
}
918931
}
919932

920933
let response = SolutionResponse {

crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tracing::{debug, error, trace};
1414
const PUBLISH_PIECE_BY_SECTOR_WAITING_DURATION_IN_SECS: u64 = 1;
1515

1616
// Piece-by-sector DSN publishing helper.
17+
#[derive(Clone)]
1718
pub(crate) struct PieceSectorPublisher {
1819
dsn_node: Node,
1920
cancelled: Arc<AtomicBool>,

0 commit comments

Comments
 (0)