complete rewrite of `multithreading::ThreadPool` removing:

- The limitation of Atomics
- Multiple Mutexes
And added message passing
This commit is contained in:
Sven Vogel 2023-06-04 22:31:00 +02:00
parent 45fe1afcd9
commit 125a3964a7
2 changed files with 70 additions and 392 deletions

View File

@ -38,7 +38,7 @@ fn dot(a: &[f64], b: &[f64]) -> f64 {
/// finished the partial dot products will be summed to create the final result. /// finished the partial dot products will be summed to create the final result.
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) { fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
let mut pool = let mut pool =
ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); ThreadPool::with_limit(threads);
// number of elements in each vector for each thread // number of elements in each vector for each thread
let steps = a.len() / threads; let steps = a.len() / threads;
@ -57,14 +57,9 @@ fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
}); });
} }
black_box( pool.join_all();
// wait for the threads to finish
pool.join_all() black_box(pool.get_results().iter().sum::<f64>());
// iterate over the results and sum the parital dot products together
.into_iter()
.map(|r| r.unwrap())
.reduce(|a, b| a + b),
);
} }
/// Compute a simple hash value for the given index value. /// Compute a simple hash value for the given index value.
@ -114,8 +109,6 @@ pub fn bench_threadpool(c: &mut Criterion) {
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) { fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
// automatically choose the number of threads // automatically choose the number of threads
let mut pool = ThreadPool::new(); let mut pool = ThreadPool::new();
// drop the handles used by each thread after its done
pool.drop_finished_handles();
// number of elements in each vector for each thread // number of elements in each vector for each thread
let steps = a.len() / threads; let steps = a.len() / threads;
@ -134,14 +127,9 @@ fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
}); });
} }
black_box( pool.join_all();
// wait for the threads to finish
pool.join_all() black_box(pool.get_results().iter().sum::<f64>());
// iterate over the results and sum the parital dot products together
.into_iter()
.map(|r| r.unwrap())
.reduce(|a, b| a + b),
);
} }
/// Benchmark the effects of over and underusing a thread pools thread capacity. /// Benchmark the effects of over and underusing a thread pools thread capacity.

View File

@ -1,399 +1,89 @@
//! This module provides the functionality to create a thread pool of fixed capacity. use std::{thread::{JoinHandle, self}, sync::{mpsc::{Receiver, channel, Sender}, Mutex, Arc}, num::NonZeroUsize, collections::VecDeque};
//! This means that the pool can be used to dispatch functions or closures that will be executed
//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether
//! threads are available. If so the pool will directly launch a new thread to run the supplied function.
//! In case no threads are available the job will be stalled for execution until a thread is free to run the first
//! stalled job.
//!
//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job
//! the pool still queries the result of the function which can be retrieved any time after the submission.
//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool.
//!
//! # Threads
//! The maximum number of threads to be used can be specified when creating a new thread pool.
//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use.
//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms.
//! For further information see: [`thread::available_parallelism`]
//!
//! # Memory consumption over time
//! The pool will store the handle for every thread launched constantly increasing the memory consumption.
//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
//!
//! # Portability
//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
//! (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
//! Additionally this implementation relies on using the `load` and `store` operations
//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls
//! to `unwrap` or `expected` from [`std::sync::MutexGuard`].
use std::{ const DEFAULT_THREAD_POOL_SIZE: usize = 1;
any::Any,
collections::VecDeque,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
/// Maximum number of thread to be used by the thread pool in case all methods pub trait Job<T>: Send + 'static + FnOnce() -> T
/// of determining a recommend number failed where T: Send
#[allow(unused)]
pub const FALLBACK_THREADS: usize = 1;
/// Returns the number of threads to be used by the thread pool by default.
/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`].
/// In case this fails [`FALLBACK_THREADS`] will be returned
fn get_default_thread_count() -> usize {
// number of threads to fallback to
let fallback_threads =
NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero");
// determine the maximum recommend number of threads to use
// most of the time this is gonna be the number of cpus
thread::available_parallelism()
.unwrap_or(fallback_threads)
.get()
}
/// This struct manages a pool of threads with a fixed maximum number.
/// Any time a closure is passed to `enqueue` the pool checks whether it can
/// directly launch a new thread to execute the closure. If the maximum number
/// of threads is reached the closure is staged and will get executed by next
/// thread to be available.
/// The pool will also keep track of every `JoinHandle` created by running every closure on
/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`.
/// # Example
/// ```rust
/// use imsearch::multithreading::ThreadPool;
/// let mut pool = ThreadPool::new();
///
/// // launch some work in parallel
/// for i in 0..10 {
/// pool.enqueue(move || {
/// println!("I am multithreaded and have id: {i}");
/// });
/// }
/// // wait for threads to finish
/// pool.join_all();
/// ```
/// # Portability
/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
/// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
/// Additionally this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls
/// to `unwrap` or `expected` from [`std::sync::MutexGuard`].
///
/// # Memory consumption over time
/// The pool will store the handle for every thread launched constantly increasing the memory consumption.
/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
#[allow(dead_code)]
#[derive(Debug)]
pub struct ThreadPool<F, T>
where
F: Send + FnOnce() -> T,
{ {
/// maximum number of threads to launch at once
max_thread_count: usize,
/// handles for launched threads
handles: Arc<Mutex<Vec<JoinHandle<T>>>>,
/// function to be executed when threads are ready
queue: Arc<Mutex<VecDeque<F>>>,
/// number of currently running threads
/// new implementation relies on atomic primitives to avoid locking and possible
/// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing"
/// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
/// Also this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add`
threads: Arc<AtomicUsize>,
/// wether to keep the thread handles after the function returned
drop_handles: Arc<AtomicBool>,
} }
impl<F, T> Default for ThreadPool<F, T> impl<U, T> Job<T> for U
where where U: Send + 'static + FnOnce() -> T, T: Send + 'static
F: Send + FnOnce() -> T, {
}
#[derive(Debug)]
pub struct ThreadPool<T, F>
where T: Send, F: Job<T>
{
queue: Arc<Mutex<VecDeque<F>>>,
handles: Vec<JoinHandle<()>>,
receiver: Receiver<T>,
sender: Sender<T>,
limit: NonZeroUsize,
}
impl<T, F> Default for ThreadPool<T, F>
where T: Send + 'static, F: Job<T>
{ {
fn default() -> Self { fn default() -> Self {
let (sender, receiver) = channel::<T>();
let default = NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero");
let limit = thread::available_parallelism().unwrap_or(default);
Self { Self {
max_thread_count: get_default_thread_count(), queue: Arc::new(Mutex::new(VecDeque::new())),
handles: Default::default(), handles: Vec::new(),
queue: Default::default(), receiver,
// will be initialized to 0 sender,
threads: Arc::new(AtomicUsize::new(0)), limit,
// do not drop handles by default
drop_handles: Arc::new(AtomicBool::new(false)),
} }
} }
} }
#[allow(dead_code)] impl<T, F> ThreadPool<T, F>
impl<F, T> ThreadPool<F, T> where T: Send + 'static, F: Job<T>
where
F: Send + FnOnce() -> T + 'static,
T: Send + 'static,
{ {
/// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads
/// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`].
/// # Limitations
/// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`].
/// That can happen if the program runs inside of a container or vm with poorly configured parallelism.
pub fn new() -> Self { pub fn new() -> Self {
Default::default()
}
pub fn with_limit(max_threads: usize) -> Self {
Self { Self {
max_thread_count: get_default_thread_count(), limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"),
..Default::default() ..Default::default()
} }
} }
/// Create a new empty thread pool with the maximum number of threads set be the specified number pub fn enqueue(&mut self, func: F) {
/// # Overusage if self.handles.len() < self.limit.get() {
/// supplying a number of threads to great may negatively impact performance as the system may not // we can still launch threads to run in parallel
/// be able to full fill the required needs
pub fn with_threads(max_thread_count: NonZeroUsize) -> Self {
Self {
max_thread_count: max_thread_count.get(),
..Default::default()
}
}
/// Create a new empty thread pool with the maximum number of threads set be the specified number // clone the sender
/// and also sets the flag to drop the handles of finished threads instead of storing them until let tx = self.sender.clone();
/// eihter `join_all` or `get_finished` is called. let queue = self.queue.clone();
/// # Overusage
/// supplying a number of threads to great may negatively impact performance as the system may not self.handles.push(thread::spawn(move || {
/// be able to full fill the required needs while let Some(job) = queue.lock().unwrap().pop_front() {
/// # Memory usage tx.send(job()).expect("cannot send result");
/// if `drop_handles` is set to `false` the pool will continue to store the handles of
/// launched threads. This causes memory consumption to rise over time as more and more
/// threads are launched.
pub fn with_threads_and_drop_handles(
max_thread_count: NonZeroUsize,
drop_handles: bool,
) -> Self {
Self {
max_thread_count: max_thread_count.get(),
drop_handles: Arc::new(AtomicBool::new(drop_handles)),
..Default::default()
} }
}
/// Pass a new closure to be executed as soon as a thread is available.
/// This function will execute the supplied closure immediately when the number of running threads
/// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time
/// in the future unless program doesn't die before.
/// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled
/// closures be executed.
pub fn enqueue(&mut self, closure: F) {
// read used thread counter and apply all store operations with Ordering::Release
let used_threads = self.threads.load(Ordering::Acquire);
// test if we can launch a new thread
if used_threads < self.max_thread_count {
// we can create a new thread, increment the thread count
self.threads
.store(used_threads.saturating_add(1), Ordering::Release);
// run new thread
execute(
self.queue.clone(),
self.handles.clone(),
self.threads.clone(),
self.drop_handles.clone(),
closure,
);
} else {
// all threads being used
// enqueue closure to be launched when a thread is ready
self.queue.lock().unwrap().push_back(closure);
}
}
/// Removes all closures stalled for execution.
/// All closures still waiting to be executed will be dropped by the pool and
/// won't get executed. Useful if an old set of closures hasn't run yet but are outdated
/// and resources are required immediately for updated closures.
pub fn discard_stalled(&mut self) {
self.queue.lock().unwrap().clear();
}
/// Waits for all currently running threads and all stalled closures to be executed.
/// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all
/// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector.
/// If the vector is of length zero, no threads were joined and the thread pool didn't do anything.
/// All handles of threads will be removed after this call.
pub fn join_all(&mut self) -> Vec<Result<T, Box<dyn Any + Send>>> {
let mut results = Vec::new();
loop {
// lock the handles, pop the last one off and unlock handles again
// to allow running threads to process
let handle = self.handles.lock().unwrap().pop();
// if we still have a handle join it else no handles are left we abort the loop
if let Some(handle) = handle {
results.push(handle.join());
continue;
}
break;
}
results
}
/// Returns the results of every thread that has already finished until now.
/// All other threads currently running won't be waited for nor for any closure stalled for execution in the future.
/// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy.
/// All handles of finished threads will be removed after this call.
pub fn get_finished(&mut self) -> Vec<Result<T, Box<dyn Any + Send>>> {
let mut results = Vec::new();
let mut handles = self.handles.lock().unwrap();
// loop through the handles and remove all finished handles
// join on the finished handles which will be quick as they are finished!
let mut idx = 0;
while idx < handles.len() {
if handles[idx].is_finished() {
// thread is finished, yield result
results.push(handles.remove(idx).join());
} else {
// thread isn't done, continue to the next one
idx += 1;
}
}
results
}
/// set the flag to indicate that thread handles will be dropped after the thread is finished
/// executing. All threads that have finished until now but haven't been removed will get dropped
/// after the next thread finishes.
pub fn drop_finished_handles(&self) {
self.drop_handles.store(false, Ordering::Release);
}
/// set the flag to indicate that thread handles will be kept after the thread is finished
/// executing until either `join_all` or `get_finished` is called.
/// Only new thread handles created after this call be kept.
pub fn keep_future_handles(&self) {
self.drop_handles.store(true, Ordering::Release);
}
}
/// Removes all thread handles which have finished only if the can be locked at
/// the current time. This function will not block execution when the lock cannot be acquired.
fn try_prune<T>(handles: Arc<Mutex<Vec<JoinHandle<T>>>>) {
if let Ok(mut handles) = handles.try_lock() {
// keep unfinished elements
handles.retain(|handle| !handle.is_finished());
}
}
/// Execute the supplied closure on a new thread
/// and store the threads handle into `handles`. When the thread
/// finished executing the closure it will look for any closures left in `queue` and
/// recursively execute it on a new thread. This method updates threads` in order to
/// keep track of the number of active threads.
fn execute<F, T>(
queue: Arc<Mutex<VecDeque<F>>>,
handles: Arc<Mutex<Vec<JoinHandle<T>>>>,
threads: Arc<AtomicUsize>,
drop: Arc<AtomicBool>,
closure: F,
) where
T: Send + 'static,
F: Send + FnOnce() -> T + 'static,
{
let handles_copy = handles.clone();
handles.lock().unwrap().push(thread::spawn(move || {
// run closure (actual work)
let result = closure();
// take the next closure stalled for execution
let next = queue.lock().unwrap().pop_front();
if let Some(next_closure) = next {
// if we have sth. to execute, spawn a new thread
execute(
queue,
handles_copy.clone(),
threads,
drop.clone(),
next_closure,
);
} else {
// nothing to execute this thread will run out without any work to do
// decrement the amount of used threads
threads.store(
threads.load(Ordering::Acquire).saturating_sub(1),
Ordering::Release,
)
}
// try to drop all fnished thread handles if necessary
// this is a non blocking operation
if drop.load(Ordering::Acquire) {
try_prune(handles_copy);
}
result
})); }));
}
#[cfg(test)] } else {
mod tests { self.queue.lock().unwrap().push_back(func);
use std::time::Duration;
use super::*;
#[test]
fn test_thread_pool() {
// auto determine the amount of threads to use
let mut pool = ThreadPool::new();
// launch 4 jobs to run on our pool
for i in 0..4 {
pool.enqueue(move || (0..=i).sum::<usize>());
} }
// wait for the threads to finish and sum their results self.handles.retain(|h| !h.is_finished());
let sum = pool
.join_all()
.into_iter()
.map(|r| r.unwrap())
.sum::<usize>();
assert_eq!(sum, 10);
} }
#[test] pub fn join_all(&mut self) {
fn test_drop_stalled() { while let Some(handle) = self.handles.pop() {
// auto determine the amount of threads to use handle.join().unwrap();
let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); }
// launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds
for i in 0..1 {
pool.enqueue(move || {
thread::sleep(Duration::from_secs(i * 20));
i
});
} }
// wait 10 secs pub fn get_results(&mut self) -> Vec<T> {
thread::sleep(Duration::from_secs(2)); self.receiver.try_iter().collect()
// discard job that should still run
pool.discard_stalled();
// wait for the threads to finish and sum their results
let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::<u64>();
assert_eq!(sum, 0);
} }
} }