From 125a3964a76fda551110df9301c5a415adedaad3 Mon Sep 17 00:00:00 2001 From: teridax Date: Sun, 4 Jun 2023 22:31:00 +0200 Subject: [PATCH] complete rewrite of `multithreading::ThreadPool` removing: - The limitation of Atomics - Multiple Mutexes And added message passing --- benches/multithreading.rs | 26 +-- src/multithreading/mod.rs | 436 ++++++-------------------------------- 2 files changed, 70 insertions(+), 392 deletions(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 84a6014..b7fdb2f 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -38,7 +38,7 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { /// finished the partial dot products will be summed to create the final result. fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { let mut pool = - ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); + ThreadPool::with_limit(threads); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -56,15 +56,10 @@ fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { dot(a, b) }); } + + pool.join_all(); - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + black_box(pool.get_results().iter().sum::()); } /// Compute a simple hash value for the given index value. @@ -114,8 +109,6 @@ pub fn bench_threadpool(c: &mut Criterion) { fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { // automatically choose the number of threads let mut pool = ThreadPool::new(); - // drop the handles used by each thread after its done - pool.drop_finished_handles(); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -134,14 +127,9 @@ fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { }); } - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); } /// Benchmark the effects of over and underusing a thread pools thread capacity. diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 00ddc74..e7cf0fe 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,399 +1,89 @@ -//! This module provides the functionality to create a thread pool of fixed capacity. -//! This means that the pool can be used to dispatch functions or closures that will be executed -//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether -//! threads are available. If so the pool will directly launch a new thread to run the supplied function. -//! In case no threads are available the job will be stalled for execution until a thread is free to run the first -//! stalled job. -//! -//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job -//! the pool still queries the result of the function which can be retrieved any time after the submission. -//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. -//! -//! # Threads -//! The maximum number of threads to be used can be specified when creating a new thread pool. -//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. -//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. -//! For further information see: [`thread::available_parallelism`] -//! -//! # Memory consumption over time -//! The pool will store the handle for every thread launched constantly increasing the memory consumption. -//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -//! -//! # Portability -//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -//! (see: ). -//! Additionally this implementation relies on using the `load` and `store` operations -//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls -//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +use std::{thread::{JoinHandle, self}, sync::{mpsc::{Receiver, channel, Sender}, Mutex, Arc}, num::NonZeroUsize, collections::VecDeque}; -use std::{ - any::Any, - collections::VecDeque, - num::NonZeroUsize, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, - }, - thread::{self, JoinHandle}, -}; +const DEFAULT_THREAD_POOL_SIZE: usize = 1; -/// Maximum number of thread to be used by the thread pool in case all methods -/// of determining a recommend number failed -#[allow(unused)] -pub const FALLBACK_THREADS: usize = 1; - -/// Returns the number of threads to be used by the thread pool by default. -/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. -/// In case this fails [`FALLBACK_THREADS`] will be returned -fn get_default_thread_count() -> usize { - // number of threads to fallback to - let fallback_threads = - NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); - // determine the maximum recommend number of threads to use - // most of the time this is gonna be the number of cpus - thread::available_parallelism() - .unwrap_or(fallback_threads) - .get() -} - -/// This struct manages a pool of threads with a fixed maximum number. -/// Any time a closure is passed to `enqueue` the pool checks whether it can -/// directly launch a new thread to execute the closure. If the maximum number -/// of threads is reached the closure is staged and will get executed by next -/// thread to be available. -/// The pool will also keep track of every `JoinHandle` created by running every closure on -/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. -/// # Example -/// ```rust -/// use imsearch::multithreading::ThreadPool; -/// let mut pool = ThreadPool::new(); -/// -/// // launch some work in parallel -/// for i in 0..10 { -/// pool.enqueue(move || { -/// println!("I am multithreaded and have id: {i}"); -/// }); -/// } -/// // wait for threads to finish -/// pool.join_all(); -/// ``` -/// # Portability -/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -/// (see: ). -/// Additionally this implementation relies on using the `load` and `store` operations -/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls -/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. -/// -/// # Memory consumption over time -/// The pool will store the handle for every thread launched constantly increasing the memory consumption. -/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -#[allow(dead_code)] -#[derive(Debug)] -pub struct ThreadPool -where - F: Send + FnOnce() -> T, +pub trait Job: Send + 'static + FnOnce() -> T +where T: Send { - /// maximum number of threads to launch at once - max_thread_count: usize, - /// handles for launched threads - handles: Arc>>>, - /// function to be executed when threads are ready - queue: Arc>>, - /// number of currently running threads - /// new implementation relies on atomic primitives to avoid locking and possible - /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" - /// (see: ). - /// Also this implementation relies on using the `load` and `store` operations - /// instead of using more comfortable one like `fetch_add` - threads: Arc, - /// wether to keep the thread handles after the function returned - drop_handles: Arc, } -impl Default for ThreadPool -where - F: Send + FnOnce() -> T, +impl Job for U + where U: Send + 'static + FnOnce() -> T, T: Send + 'static +{ +} + +#[derive(Debug)] +pub struct ThreadPool + where T: Send, F: Job +{ + queue: Arc>>, + handles: Vec>, + receiver: Receiver, + sender: Sender, + limit: NonZeroUsize, +} + +impl Default for ThreadPool +where T: Send + 'static, F: Job { fn default() -> Self { + let (sender, receiver) = channel::(); + + let default = NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + let limit = thread::available_parallelism().unwrap_or(default); + Self { - max_thread_count: get_default_thread_count(), - handles: Default::default(), - queue: Default::default(), - // will be initialized to 0 - threads: Arc::new(AtomicUsize::new(0)), - // do not drop handles by default - drop_handles: Arc::new(AtomicBool::new(false)), + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + receiver, + sender, + limit, } } } -#[allow(dead_code)] -impl ThreadPool -where - F: Send + FnOnce() -> T + 'static, - T: Send + 'static, +impl ThreadPool +where T: Send + 'static, F: Job { - /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads - /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. - /// # Limitations - /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. - /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. pub fn new() -> Self { + Default::default() + } + + pub fn with_limit(max_threads: usize) -> Self { Self { - max_thread_count: get_default_thread_count(), + limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), ..Default::default() } } - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { - Self { - max_thread_count: max_thread_count.get(), - ..Default::default() - } - } + pub fn enqueue(&mut self, func: F) { + if self.handles.len() < self.limit.get() { + // we can still launch threads to run in parallel - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// and also sets the flag to drop the handles of finished threads instead of storing them until - /// eihter `join_all` or `get_finished` is called. - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - /// # Memory usage - /// if `drop_handles` is set to `false` the pool will continue to store the handles of - /// launched threads. This causes memory consumption to rise over time as more and more - /// threads are launched. - pub fn with_threads_and_drop_handles( - max_thread_count: NonZeroUsize, - drop_handles: bool, - ) -> Self { - Self { - max_thread_count: max_thread_count.get(), - drop_handles: Arc::new(AtomicBool::new(drop_handles)), - ..Default::default() - } - } + // clone the sender + let tx = self.sender.clone(); + let queue = self.queue.clone(); + + self.handles.push(thread::spawn(move || { + while let Some(job) = queue.lock().unwrap().pop_front() { + tx.send(job()).expect("cannot send result"); + } + })); - /// Pass a new closure to be executed as soon as a thread is available. - /// This function will execute the supplied closure immediately when the number of running threads - /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time - /// in the future unless program doesn't die before. - /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled - /// closures be executed. - pub fn enqueue(&mut self, closure: F) { - // read used thread counter and apply all store operations with Ordering::Release - let used_threads = self.threads.load(Ordering::Acquire); - // test if we can launch a new thread - if used_threads < self.max_thread_count { - // we can create a new thread, increment the thread count - self.threads - .store(used_threads.saturating_add(1), Ordering::Release); - // run new thread - execute( - self.queue.clone(), - self.handles.clone(), - self.threads.clone(), - self.drop_handles.clone(), - closure, - ); } else { - // all threads being used - // enqueue closure to be launched when a thread is ready - self.queue.lock().unwrap().push_back(closure); + self.queue.lock().unwrap().push_back(func); + } + + self.handles.retain(|h| !h.is_finished()); + } + + pub fn join_all(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); } } - /// Removes all closures stalled for execution. - /// All closures still waiting to be executed will be dropped by the pool and - /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated - /// and resources are required immediately for updated closures. - pub fn discard_stalled(&mut self) { - self.queue.lock().unwrap().clear(); + pub fn get_results(&mut self) -> Vec { + self.receiver.try_iter().collect() } - - /// Waits for all currently running threads and all stalled closures to be executed. - /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all - /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. - /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. - /// All handles of threads will be removed after this call. - pub fn join_all(&mut self) -> Vec>> { - let mut results = Vec::new(); - loop { - // lock the handles, pop the last one off and unlock handles again - // to allow running threads to process - let handle = self.handles.lock().unwrap().pop(); - - // if we still have a handle join it else no handles are left we abort the loop - if let Some(handle) = handle { - results.push(handle.join()); - continue; - } - break; - } - - results - } - - /// Returns the results of every thread that has already finished until now. - /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. - /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. - /// All handles of finished threads will be removed after this call. - pub fn get_finished(&mut self) -> Vec>> { - let mut results = Vec::new(); - - let mut handles = self.handles.lock().unwrap(); - - // loop through the handles and remove all finished handles - // join on the finished handles which will be quick as they are finished! - let mut idx = 0; - while idx < handles.len() { - if handles[idx].is_finished() { - // thread is finished, yield result - results.push(handles.remove(idx).join()); - } else { - // thread isn't done, continue to the next one - idx += 1; - } - } - - results - } - - /// set the flag to indicate that thread handles will be dropped after the thread is finished - /// executing. All threads that have finished until now but haven't been removed will get dropped - /// after the next thread finishes. - pub fn drop_finished_handles(&self) { - self.drop_handles.store(false, Ordering::Release); - } - - /// set the flag to indicate that thread handles will be kept after the thread is finished - /// executing until either `join_all` or `get_finished` is called. - /// Only new thread handles created after this call be kept. - pub fn keep_future_handles(&self) { - self.drop_handles.store(true, Ordering::Release); - } -} - -/// Removes all thread handles which have finished only if the can be locked at -/// the current time. This function will not block execution when the lock cannot be acquired. -fn try_prune(handles: Arc>>>) { - if let Ok(mut handles) = handles.try_lock() { - // keep unfinished elements - handles.retain(|handle| !handle.is_finished()); - } -} - -/// Execute the supplied closure on a new thread -/// and store the threads handle into `handles`. When the thread -/// finished executing the closure it will look for any closures left in `queue` and -/// recursively execute it on a new thread. This method updates threads` in order to -/// keep track of the number of active threads. -fn execute( - queue: Arc>>, - handles: Arc>>>, - threads: Arc, - drop: Arc, - closure: F, -) where - T: Send + 'static, - F: Send + FnOnce() -> T + 'static, -{ - let handles_copy = handles.clone(); - - handles.lock().unwrap().push(thread::spawn(move || { - // run closure (actual work) - let result = closure(); - - // take the next closure stalled for execution - let next = queue.lock().unwrap().pop_front(); - if let Some(next_closure) = next { - // if we have sth. to execute, spawn a new thread - execute( - queue, - handles_copy.clone(), - threads, - drop.clone(), - next_closure, - ); - } else { - // nothing to execute this thread will run out without any work to do - // decrement the amount of used threads - threads.store( - threads.load(Ordering::Acquire).saturating_sub(1), - Ordering::Release, - ) - } - - // try to drop all fnished thread handles if necessary - // this is a non blocking operation - if drop.load(Ordering::Acquire) { - try_prune(handles_copy); - } - - result - })); -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - #[test] - fn test_thread_pool() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::new(); - - // launch 4 jobs to run on our pool - for i in 0..4 { - pool.enqueue(move || (0..=i).sum::()); - } - - // wait for the threads to finish and sum their results - let sum = pool - .join_all() - .into_iter() - .map(|r| r.unwrap()) - .sum::(); - - assert_eq!(sum, 10); - } - - #[test] - fn test_drop_stalled() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); - - // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds - for i in 0..1 { - pool.enqueue(move || { - thread::sleep(Duration::from_secs(i * 20)); - i - }); - } - - // wait 10 secs - thread::sleep(Duration::from_secs(2)); - // discard job that should still run - pool.discard_stalled(); - - // wait for the threads to finish and sum their results - let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); - - assert_eq!(sum, 0); - } -} +} \ No newline at end of file