This library allows you to distribute computation over indexed sources
(slices, ranges, Vec, etc.) among
multiple threads. It aims to uphold the highest standards of documentation,
testing and safety, see the FAQ below.
It is designed to be as lightweight as possible, following the principles outlined in the blog post Optimization adventures: making a parallel Rust workload 10x faster with (or without) Rayon. Benchmarks on a real-world use case can be seen here.
use paralight::prelude::*;
// Create a thread pool with the given parameters.
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy: RangeStrategy::WorkStealing,
cpu_pinning: CpuPinningPolicy::No,
}
.build();
// Compute the sum of a slice.
let input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let sum = input
.par_iter()
.with_thread_pool(&mut thread_pool)
.sum::<i32>();
assert_eq!(sum, 5 * 11);
// Add slices together.
let mut output = [0; 10];
let left = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let right = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
(output.par_iter_mut(), left.par_iter(), right.par_iter())
.zip_eq()
.with_thread_pool(&mut thread_pool)
.for_each(|(out, &a, &b)| *out = a + b);
assert_eq!(output, [12, 14, 16, 18, 20, 22, 24, 26, 28, 30]);Paralight supports various indexed sources out-of-the-box (slices,
ranges, etc.), and can be extended to other types via the
ParallelSource trait, together with the conversion
traits (IntoParallelSource,
IntoParallelRefSource and
IntoParallelRefMutSource).
The ThreadPoolBuilder provides an explicit way
to configure your thread pool, giving you fine-grained control over performance
for your workload. There is no default, which is deliberate because the suitable
parameters depend on your workload.
Paralight allows you to specify the number of worker threads to spawn in a
thread pool with the ThreadCount enum:
AvailableParallelismuses the number of threads returned by the standard library'savailable_parallelism()function,Count(_)uses the specified number of threads, which must be non-zero.
For convenience, ThreadCount implements the
TryFrom<usize> trait to create a
Count(_) instance, validating that the given
number of threads is not zero.
Recommendation: It depends. While
AvailableParallelism may be a
good default, it usually returns twice the number of CPU cores (at least on
Intel) to account for
hyper-threading. Whether this
is optimal or not depends on your workload, for example whether it's compute
bound or memory bound, whether a single thread can saturate the resources of one
core or not, etc. Generally, the long list of caveats mentioned in the
documentation of available_parallelism()
applies.
On some workloads, hyper-threading doesn't provide a performance boost over
using only one thread per core, because two hyper-threads would compete on
resources on the core they share (e.g. memory caches). In this case, using half
of what available_parallelism() returns
can reduce contention and perform better.
If your program is not running alone on your machine but is competing with other programs, using too many threads can also be detrimental to the overall performance of your system.
Paralight offers two strategies in the RangeStrategy
enum to distribute computation among threads:
Fixedsplits the input evenly and hands out a fixed sequential range of items to each worker thread,WorkStealingstarts with the fixed distribution, but lets each worker thread steal items from others once it is done processing its items.
Recommendation: If your pipeline is performing roughly the same amont of
work for each item, you should probably use the
Fixed strategy, to avoid paying the
synchronization cost of work-stealing. This is especially true if the amount of
work per item is small (e.g. some simple arithmetic operations). If the amoung
of work per item is highly variable and/or large, you should probably use the
WorkStealing strategy (e.g. parsing
strings, processing files).
Note: In work-stealing mode, each thread processes an arbitrary subset of
items in arbitrary order, meaning that a reduction operation must be both
commutative and
associative to yield a
deterministic result (in contrast to the standard library's
Iterator trait that processes items in sequential
order). Fortunately, a lot of common operations are commutative and associative,
but be mindful of this.
use paralight::prelude::*;
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy: RangeStrategy::WorkStealing,
cpu_pinning: CpuPinningPolicy::No,
}
.build();
let s = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
.par_iter()
.with_thread_pool(&mut thread_pool)
.map(|c: &char| c.to_string())
.reduce(String::new, |mut a: String, b: String| {
a.push_str(&b);
a
});
// ⚠️ There is no guarantee that this check passes. In practice, `s` contains any permutation
// of the input, such as "fgdebachij".
assert_eq!(s, "abcdefghij");
// This makes sure the example panics anyway if the permutation is (by luck) the identity.
panic!("Congratulations, you won the lottery and the assertion passed this time!");Paralight allows pinning each worker thread to one CPU, on platforms that
support it. For now, this is implemented for platforms whose
target_os
is among android, dragonfly, freebsd, linux (platforms that support
libc::sched_setaffinity() via the nix crate)
and windows (using
SetThreadAffinityMask()
via the windows-sys crate).
Paralight offers three policies in the
CpuPinningPolicy enum:
Nodoesn't pin worker threads to CPUs,IfSupportedattempts to pin each worker thread to a distinct CPU on supported platforms, but proceeds without pinning if running on an unsupported platform or if the pinning function fails,Alwayspins each worker thread to a distinct CPU, panicking if the platform isn't supported or if the pinning function returns an error.
Recommendation: Whether CPU pinning is useful or detrimental depends on your
workload. If you're processing the same data over and over again (e.g. calling
par_iter() multiple times on the same
data), CPU pinning can help ensure that each subset of the data is always
processed on the same CPU core and stays fresh in the lower-level per-core
caches, speeding up memory accesses. This however depends on the amount of data:
if it's too large, it may not fit in per-core caches anyway.
If your program is not running alone on your machine but is competing with other
programs, CPU pinning may be detrimental, as a worker thread will be blocked
whenever its required core is used by another program, even if another core is
free and other worker threads are done (especially with the
Fixed strategy). This of course depends on
how the scheduler works on your OS.
To create parallel pipelines, be mindful that the
with_thread_pool() function takes
a ThreadPool by mutable reference &mut.
This is a deliberate design choice because only one pipeline can be run at a
time on a given Paralight thread pool (for more flexible options, see
"Bringing your own thread pool" below).
To release the resources (i.e. the worker threads) created by a
ThreadPool, simply drop() it.
If you want to create a global thread pool, you will have to wrap it in a
Mutex (or other suitable synchronization primitive) and
manually lock it to obtain a suitable &mut ThreadPool. You can for example
combine a mutex with the LazyLock pattern.
use paralight::prelude::*;
use std::ops::DerefMut;
use std::sync::{LazyLock, Mutex};
// A static thread pool protected by a mutex.
static THREAD_POOL: LazyLock<Mutex<ThreadPool>> = LazyLock::new(|| {
Mutex::new(
ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy: RangeStrategy::WorkStealing,
cpu_pinning: CpuPinningPolicy::No,
}
.build(),
)
});
let items = (0..100).collect::<Vec<_>>();
let sum = items
.par_iter()
.with_thread_pool(THREAD_POOL.lock().unwrap().deref_mut())
.sum::<i32>();
assert_eq!(sum, 99 * 50);However, if you wrap a thread pool in a mutex like this, be mindful of potential panics or deadlocks if you try to run several nested parallel iterators on the same thread pool!
This limitation isn't specific to Paralight though, this happens for any usage
of a Mutex that you try to lock recursively while already
acquired.
This pitfall is the reason why Paralight doesn't provide an implicit global thread pool.
# use paralight::prelude::*;
# use std::ops::DerefMut;
# use std::sync::{LazyLock, Mutex};
#
# static THREAD_POOL: LazyLock<Mutex<ThreadPool>> = LazyLock::new(|| {
# Mutex::new(
# ThreadPoolBuilder {
# num_threads: ThreadCount::AvailableParallelism,
# range_strategy: RangeStrategy::WorkStealing,
# cpu_pinning: CpuPinningPolicy::No,
# }
# .build(),
# )
# });
let matrix = (0..100)
.map(|i| (0..100).map(|j| i + j).collect::<Vec<_>>())
.collect::<Vec<_>>();
let sum = matrix
.par_iter()
// Lock the mutex on the outer loop (over the rows).
.with_thread_pool(THREAD_POOL.lock().unwrap().deref_mut())
.map(|row| {
row.par_iter()
// ⚠️ Trying to lock the mutex again here will panic or deadlock!
.with_thread_pool(THREAD_POOL.lock().unwrap().deref_mut())
.sum::<i32>()
})
.sum::<i32>();
// ⚠️ This statement is never reached due to the panic/deadlock!
assert_eq!(sum, 990_000);As an alternative to the provided ThreadPool
implementation, you can use Paralight with any thread pool that implements the
GenericThreadPool interface, via the
with_thread_pool() adaptor.
Note that the GenericThreadPool trait is marked as
unsafe due to the requirements that your implementation must uphold.
If you don't need the default ThreadPool
implementation, you can disable the default-thread-pool feature of Paralight
and still benefit from all the iterator API.
For convenience, the RayonThreadPool wrapper
around Rayon is available under the rayon feature,
and implements GenericThreadPool.
However, note that this backend isn't tested with Miri nor ThreadSanitizer, because Rayon is broken with them.
This wrapper allows you to control how many tasks are run on Rayon's thread pool. It is recommended to match the number of threads in the pool (as well as the available parallelism), to have one Paralight task per thread (and per available CPU core). In that case, performance should be similar to vanilla Paralight. This is of course only a guideline: as usual benchmark and profile your code to know which configuration is optimal.
# // TODO: Enable Miri once supported by Rayon and its dependencies:
# // https://github.com/crossbeam-rs/crossbeam/issues/1181.
# #[cfg(all(feature = "rayon", not(miri)))]
# {
use paralight::prelude::*;
let thread_pool = RayonThreadPool::new_global(
ThreadCount::try_from(rayon_core::current_num_threads())
.expect("Paralight cannot operate with 0 threads"),
RangeStrategy::WorkStealing,
);
let input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let sum = input.par_iter().with_thread_pool(&thread_pool).sum::<i32>();
assert_eq!(sum, 5 * 11);
# }With the WorkStealing strategy, inputs
with more than u32::MAX elements are currently not supported.
use paralight::prelude::*;
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy: RangeStrategy::WorkStealing,
cpu_pinning: CpuPinningPolicy::No,
}
.build();
let _sum = (0..5_000_000_000_usize)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.sum::<usize>();Two optional features are available if you want to debug performance.
log, based on thelogcrate prints basic information about inter-thread synchronization: thread creation/shutdown, when each thread starts/finishes a computation, etc.log_parallelismprints detailed traces about which items are processed by which thread, and work-stealing statistics (e.g. how many times work was stolen among threads).
Note that in any case neither the input items nor the resulting computation are
logged. Only the indices of the items in the input may be present in the logs.
If you're concerned that these indices leak too much information about your
data, you need to make sure that you depend on Paralight with the log and
log_parallelism features disabled.
Some experimental APIs are available under the nightly Cargo feature, for
users who compile with a
nightly
Rust toolchain. As the underlying implementation is based on
experimental features of the Rust
language, these APIs are provided without guarantee and may break at any time
when a new nightly toolchain is released.
All public APIs of Paralight are documented, which is enforced by the
forbid(missing_docs) lint. The aim is to have at least one example per API,
naturally
tested via rustdoc.
Paralight is thoroughly tested, with code coverage as close to 100% as possible.
The testing strategy combines
rustdoc examples,
top-level stress tests and unit tests on the most critical components.
Paralight aims to use as little unsafe code as possible. As a first measure,
the following lints are enabled throughout the code base, to make sure each use
of an unsafe API is explained and each new unsafe function documents its
pre- and post-conditions.
#![forbid(
missing_docs,
unsafe_op_in_unsafe_fn,
clippy::missing_safety_doc,
clippy::multiple_unsafe_ops_per_block,
clippy::undocumented_unsafe_blocks,
)]
# //! Here are some docs
Additionally, extensive testing is conducted with ThreadSanitizer and Miri. Vanilla Paralight is 100% compatible with them, including Miri's detection of memory leaks.
This multi-layered approach to safety is crucial given how complex mixing memory safety with multi-threading is, and it has indeed caught a bug during development.
Note: The Rayon thread pool integration is unfortunately not tested under Miri, as Rayon in general triggers Miri errors. Likewise, Rayon local thread pools aren't compatible with ThreadSanitizer.
As mentioned, Paralight uses as little unsafe code as possible. Here is the
list of places where unsafe is needed.
- Lifetime-erasure in
threads/thread_pool/util.rs.
The goal is essentially to share an
Arc<Mutex<&'a T>>between the main threads and worker threads whereTcontains a description of a parallel pipeline. The difficulty is that the lifetime'ais only valid for a limited scope (for example, a parallel iterator may capture local variables by reference). Even though synchronization is in place to make sure the worker threads only access this pipeline during'a, there is no way to write a safe Rust type forArc<Mutex<&'a T>>where the lifetime'achanges over time (the same mutex is reused for successive pipelines sent to Paralight). Glossing over the details, a type akin toArc<Mutex<&'static T>>is used instead (i.e. the lifetime is marked'static), withunsafecode to rescope the'staticlifetime to a local'aas needed.SendandSyncimplementations are also provided (when sound) on this wrapper type. - The
SliceParallelSourceAPI in iter/source/slice.rs usesslice::get_unchecked()as it guides the compiler to better optimize the code. In particular, missed vectorized loops were observed without it. - The
MutSliceParallelSourceAPI in iter/source/slice.rs. The goal is to provide parallel iterators that produce mutable references&mut Tto items of a mutable slice&mut [T]. Sharing a&mut [T]with all the threads wouldn't work as it would violate Rust's aliasing rules. Using theslice::split_at_mut()API would not work either, as it isn't known in advance where a split will occur nor which worker thread will consume which item (due to work stealing). An approach that decomposes the slice into a pointer-length pair is used instead, making it possible to share the raw pointer with all the worker threads. - Similarly, the
VecParallelSourceAPI in iter/source/vec.rs provides parallel iterators that consume aVec<T>. This is achieved by decomposing the vector into a pointer-length-capacity triple, and sharing the base pointer with all the worker threads so that they can consume items of typeT(viastd::ptr::read()). Additionally, the originalVec<T>allocation is released when the iterator is dropped (to avoid memory leaks), which involves reconstructing it from the pointer-allocation pair (viaVec::from_raw_parts()). - Likewise, the
ArrayParallelSourceAPI in iter/source/array.rs provides parallel iterators that consume a[T; N]. The situation is similar toVec<T>, except that the items aren't allocated on the heap behind a pointer, but directly in the array. This involves a more careful combination of wrapper types. Note that a prior implementation was quickly reverted due to being unsound, highlighting once again the importance of code coverage and the effectiveness of Miri. - Windows API calls are used to set thread affinity in threads/thread_pool/mod.rs when requested.
- Lastly, the definition of the
SourceDescriptortrait in iter/source/mod.rs hasunsafemethods because it requires the caller to pass each index once and only once. Indeed, the safety of the previously mentioned iterator sources (mutable slice, vector, array) assumes a correct calling pattern. TheSourceDescriptortrait is public (so that dependents of Paralight can define their own sources of items), so theseunsafefunctions leak in the public API. Internally, this causesunsafeblocks each time the trait is implemented, and the associated safety comments are a good opportunity to check correctness. - Symmetrically,
GenericThreadPoolis anunsafetrait, implemented for&mut ThreadPoolin threads/thread_pool/mod.rs and for&RayonThreadPoolin threads/rayon.rs. This in turn relies on correctness of the (safe) work-stealing implementation in core/range.rs, that is still missing a formal proof.
And that's all the unsafe code there is!
This is not an officially supported Google product.
See CONTRIBUTING.md for details.
Note that Paralight is still an early stage project, with many design changes as new features are added. Therefore it is highly recommended to file a GitHub issue for discussion before submitting a pull request, unless you're making a trivial change or bug fix.
This software is distributed under the terms of both the MIT license and the Apache License (Version 2.0).
See LICENSE for details.