diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 84a6014..20ce75e 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -4,7 +4,7 @@ //! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit //! double precision floating point values. -use std::{num::NonZeroUsize, sync::Arc}; +use std::sync::Arc; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use imsearch::multithreading::ThreadPool; @@ -37,8 +37,8 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { /// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have /// finished the partial dot products will be summed to create the final result. fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { - let mut pool = - ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); + + let mut pool = ThreadPool::with_limit(threads); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -56,15 +56,10 @@ fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { dot(a, b) }); } + + pool.join_all(); - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + black_box(pool.get_results().iter().sum::()); } /// Compute a simple hash value for the given index value. @@ -114,8 +109,6 @@ pub fn bench_threadpool(c: &mut Criterion) { fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { // automatically choose the number of threads let mut pool = ThreadPool::new(); - // drop the handles used by each thread after its done - pool.drop_finished_handles(); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -134,14 +127,9 @@ fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { }); } - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); } /// Benchmark the effects of over and underusing a thread pools thread capacity. diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 00ddc74..d20347a 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,399 +1,265 @@ -//! This module provides the functionality to create a thread pool of fixed capacity. -//! This means that the pool can be used to dispatch functions or closures that will be executed -//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether -//! threads are available. If so the pool will directly launch a new thread to run the supplied function. -//! In case no threads are available the job will be stalled for execution until a thread is free to run the first -//! stalled job. +//! This module provides the functionality to create thread pool to execute tasks in parallel. +//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +//! a new thread will be launched until the maximum number is reached. By then every launched thread will +//! be reused to process the remaining elements of the queue. If no jobs are left to be executed +//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +//! # Example +//! ```rust +//! # use imsearch::multithreading::ThreadPool; +//! let mut pool = ThreadPool::with_limit(2); //! -//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job -//! the pool still queries the result of the function which can be retrieved any time after the submission. -//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. +//! for i in 0..10 { +//! pool.enqueue(move || i); +//! } //! -//! # Threads -//! The maximum number of threads to be used can be specified when creating a new thread pool. -//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. -//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. -//! For further information see: [`thread::available_parallelism`] -//! -//! # Memory consumption over time -//! The pool will store the handle for every thread launched constantly increasing the memory consumption. -//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -//! -//! # Portability -//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -//! (see: ). -//! Additionally this implementation relies on using the `load` and `store` operations -//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls -//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +//! pool.join_all(); +//! assert_eq!(pool.get_results().iter().sum::(), 45); +//! ``` use std::{ - any::Any, collections::VecDeque, num::NonZeroUsize, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + mpsc::{channel, Receiver, Sender}, Arc, Mutex, }, thread::{self, JoinHandle}, }; -/// Maximum number of thread to be used by the thread pool in case all methods -/// of determining a recommend number failed -#[allow(unused)] -pub const FALLBACK_THREADS: usize = 1; +/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails. +pub const DEFAULT_THREAD_POOL_SIZE: usize = 1; -/// Returns the number of threads to be used by the thread pool by default. -/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. -/// In case this fails [`FALLBACK_THREADS`] will be returned -fn get_default_thread_count() -> usize { - // number of threads to fallback to - let fallback_threads = - NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); - // determine the maximum recommend number of threads to use - // most of the time this is gonna be the number of cpus - thread::available_parallelism() - .unwrap_or(fallback_threads) - .get() +/// Indicates the priority level of functions or closures which get supplied to the pool. +/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied +/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum Priority { + /// Indicate that the closure or function supplied to the thread + /// has higher priority than any other given to the pool until now. + /// The item will get enqueued at the start of the waiting-queue. + High, + /// Indicate that the closure or function supplied to the thread pool + /// has lower priority than the already supplied ones in this pool. + /// The item will get enqueued at the end of the waiting-queue. + Low, } -/// This struct manages a pool of threads with a fixed maximum number. -/// Any time a closure is passed to `enqueue` the pool checks whether it can -/// directly launch a new thread to execute the closure. If the maximum number -/// of threads is reached the closure is staged and will get executed by next -/// thread to be available. -/// The pool will also keep track of every `JoinHandle` created by running every closure on -/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. +/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are +/// free to execute them directly. They are meant to be executed only once and be done. +pub trait Job: Send + 'static + FnOnce() -> T +where + T: Send, +{ +} + +impl Job for U +where + U: Send + 'static + FnOnce() -> T, + T: Send + 'static, +{ +} + +/// Thread pool which can be used to execute functions or closures in parallel. +/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +/// a new thread will be launched until the maximum number is reached. By then every launched thread will +/// be reused to process the remaining elements of the queue. If no jobs are left to be executed +/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. /// # Example /// ```rust -/// use imsearch::multithreading::ThreadPool; -/// let mut pool = ThreadPool::new(); +/// # use imsearch::multithreading::ThreadPool; +/// let mut pool = ThreadPool::with_limit(2); /// -/// // launch some work in parallel /// for i in 0..10 { -/// pool.enqueue(move || { -/// println!("I am multithreaded and have id: {i}"); -/// }); +/// pool.enqueue(move || i); /// } -/// // wait for threads to finish -/// pool.join_all(); -/// ``` -/// # Portability -/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -/// (see: ). -/// Additionally this implementation relies on using the `load` and `store` operations -/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls -/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. /// -/// # Memory consumption over time -/// The pool will store the handle for every thread launched constantly increasing the memory consumption. -/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -#[allow(dead_code)] +/// pool.join_all(); +/// assert_eq!(pool.get_results().iter().sum::(), 45); +/// ``` #[derive(Debug)] -pub struct ThreadPool +pub struct ThreadPool where - F: Send + FnOnce() -> T, + T: Send, + F: Job, { - /// maximum number of threads to launch at once - max_thread_count: usize, - /// handles for launched threads - handles: Arc>>>, - /// function to be executed when threads are ready + /// queue for storing the jobs to be executed queue: Arc>>, - /// number of currently running threads - /// new implementation relies on atomic primitives to avoid locking and possible - /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" - /// (see: ). - /// Also this implementation relies on using the `load` and `store` operations - /// instead of using more comfortable one like `fetch_add` - threads: Arc, - /// wether to keep the thread handles after the function returned - drop_handles: Arc, + /// handles for all threads currently running and processing jobs + handles: Vec>, + /// reciver end for channel based communication between threads + receiver: Receiver, + /// sender end for channel based communication between threads + sender: Sender, + /// maximum amount of threads to be used in parallel + limit: NonZeroUsize, } -impl Default for ThreadPool +impl Default for ThreadPool where - F: Send + FnOnce() -> T, + T: Send + 'static, + F: Job, { fn default() -> Self { + let (sender, receiver) = channel::(); + + // determine default thread count to use based on the system + let default = + NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + let limit = thread::available_parallelism().unwrap_or(default); + Self { - max_thread_count: get_default_thread_count(), - handles: Default::default(), - queue: Default::default(), - // will be initialized to 0 - threads: Arc::new(AtomicUsize::new(0)), - // do not drop handles by default - drop_handles: Arc::new(AtomicBool::new(false)), + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + receiver, + sender, + limit, } } } -#[allow(dead_code)] -impl ThreadPool +impl ThreadPool where - F: Send + FnOnce() -> T + 'static, T: Send + 'static, + F: Job, { - /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads - /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. - /// # Limitations - /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. - /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. + /// Creates a new thread pool with default thread count determined by either + /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. + /// No threads will be lauched until jobs are enqueued. pub fn new() -> Self { + Default::default() + } + + /// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if + /// the system does not allow for that count of parallelism. + /// No threads will be lauched until jobs are enqueued. + /// # Panic + /// This function will fails if `max_threads` is zero. + pub fn with_limit(max_threads: usize) -> Self { Self { - max_thread_count: get_default_thread_count(), + limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), ..Default::default() } } - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { - Self { - max_thread_count: max_thread_count.get(), - ..Default::default() + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job will determine if the job will be put at the start or end of the queue. + /// See [`crate::multithreading::Priority`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue_priorize(&mut self, func: F, priority: Priority) { + // put job into queue + let mut queue = self.queue.lock().unwrap(); + + // insert new job into queue depending on its priority + match priority { + Priority::High => queue.push_front(func), + Priority::Low => queue.push_back(func), + } + + if self.handles.len() < self.limit.get() { + // we can still launch threads to run in parallel + + // clone the sender + let tx = self.sender.clone(); + let queue = self.queue.clone(); + + self.handles.push(thread::spawn(move || { + while let Some(job) = queue.lock().unwrap().pop_front() { + tx.send(job()).expect("cannot send result"); + } + })); + } + + self.handles.retain(|h| !h.is_finished()); + } + + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue(&mut self, func: F) { + self.enqueue_priorize(func, Priority::Low); + } + + /// Wait for all threads to finish executing. This means that by the time all threads have finished + /// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty. + /// This function will block the caller thread. + pub fn join_all(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); } } - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// and also sets the flag to drop the handles of finished threads instead of storing them until - /// eihter `join_all` or `get_finished` is called. - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - /// # Memory usage - /// if `drop_handles` is set to `false` the pool will continue to store the handles of - /// launched threads. This causes memory consumption to rise over time as more and more - /// threads are launched. - pub fn with_threads_and_drop_handles( - max_thread_count: NonZeroUsize, - drop_handles: bool, - ) -> Self { - Self { - max_thread_count: max_thread_count.get(), - drop_handles: Arc::new(AtomicBool::new(drop_handles)), - ..Default::default() - } + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. + /// All results retrieved from this call won't be returned on a second call. + /// This function is non blocking. + pub fn try_get_results(&mut self) -> Vec { + self.receiver.try_iter().collect() } - /// Pass a new closure to be executed as soon as a thread is available. - /// This function will execute the supplied closure immediately when the number of running threads - /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time - /// in the future unless program doesn't die before. - /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled - /// closures be executed. - pub fn enqueue(&mut self, closure: F) { - // read used thread counter and apply all store operations with Ordering::Release - let used_threads = self.threads.load(Ordering::Acquire); - // test if we can launch a new thread - if used_threads < self.max_thread_count { - // we can create a new thread, increment the thread count - self.threads - .store(used_threads.saturating_add(1), Ordering::Release); - // run new thread - execute( - self.queue.clone(), - self.handles.clone(), - self.threads.clone(), - self.drop_handles.clone(), - closure, - ); - } else { - // all threads being used - // enqueue closure to be launched when a thread is ready - self.queue.lock().unwrap().push_back(closure); - } + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). + /// All results retrieved from this call won't be returned on a second call. + /// This function will block the caller thread. + pub fn get_results(&mut self) -> Vec { + self.join_all(); + self.try_get_results() } - - /// Removes all closures stalled for execution. - /// All closures still waiting to be executed will be dropped by the pool and - /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated - /// and resources are required immediately for updated closures. - pub fn discard_stalled(&mut self) { - self.queue.lock().unwrap().clear(); - } - - /// Waits for all currently running threads and all stalled closures to be executed. - /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all - /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. - /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. - /// All handles of threads will be removed after this call. - pub fn join_all(&mut self) -> Vec>> { - let mut results = Vec::new(); - loop { - // lock the handles, pop the last one off and unlock handles again - // to allow running threads to process - let handle = self.handles.lock().unwrap().pop(); - - // if we still have a handle join it else no handles are left we abort the loop - if let Some(handle) = handle { - results.push(handle.join()); - continue; - } - break; - } - - results - } - - /// Returns the results of every thread that has already finished until now. - /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. - /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. - /// All handles of finished threads will be removed after this call. - pub fn get_finished(&mut self) -> Vec>> { - let mut results = Vec::new(); - - let mut handles = self.handles.lock().unwrap(); - - // loop through the handles and remove all finished handles - // join on the finished handles which will be quick as they are finished! - let mut idx = 0; - while idx < handles.len() { - if handles[idx].is_finished() { - // thread is finished, yield result - results.push(handles.remove(idx).join()); - } else { - // thread isn't done, continue to the next one - idx += 1; - } - } - - results - } - - /// set the flag to indicate that thread handles will be dropped after the thread is finished - /// executing. All threads that have finished until now but haven't been removed will get dropped - /// after the next thread finishes. - pub fn drop_finished_handles(&self) { - self.drop_handles.store(false, Ordering::Release); - } - - /// set the flag to indicate that thread handles will be kept after the thread is finished - /// executing until either `join_all` or `get_finished` is called. - /// Only new thread handles created after this call be kept. - pub fn keep_future_handles(&self) { - self.drop_handles.store(true, Ordering::Release); - } -} - -/// Removes all thread handles which have finished only if the can be locked at -/// the current time. This function will not block execution when the lock cannot be acquired. -fn try_prune(handles: Arc>>>) { - if let Ok(mut handles) = handles.try_lock() { - // keep unfinished elements - handles.retain(|handle| !handle.is_finished()); - } -} - -/// Execute the supplied closure on a new thread -/// and store the threads handle into `handles`. When the thread -/// finished executing the closure it will look for any closures left in `queue` and -/// recursively execute it on a new thread. This method updates threads` in order to -/// keep track of the number of active threads. -fn execute( - queue: Arc>>, - handles: Arc>>>, - threads: Arc, - drop: Arc, - closure: F, -) where - T: Send + 'static, - F: Send + FnOnce() -> T + 'static, -{ - let handles_copy = handles.clone(); - - handles.lock().unwrap().push(thread::spawn(move || { - // run closure (actual work) - let result = closure(); - - // take the next closure stalled for execution - let next = queue.lock().unwrap().pop_front(); - if let Some(next_closure) = next { - // if we have sth. to execute, spawn a new thread - execute( - queue, - handles_copy.clone(), - threads, - drop.clone(), - next_closure, - ); - } else { - // nothing to execute this thread will run out without any work to do - // decrement the amount of used threads - threads.store( - threads.load(Ordering::Acquire).saturating_sub(1), - Ordering::Release, - ) - } - - // try to drop all fnished thread handles if necessary - // this is a non blocking operation - if drop.load(Ordering::Acquire) { - try_prune(handles_copy); - } - - result - })); } #[cfg(test)] -mod tests { - use std::time::Duration; - +mod test { use super::*; #[test] - fn test_thread_pool() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::new(); + fn test_default() { + let mut pool = ThreadPool::default(); - // launch 4 jobs to run on our pool - for i in 0..4 { - pool.enqueue(move || (0..=i).sum::()); + for i in 0..10 { + pool.enqueue_priorize(move || i, Priority::High); } - // wait for the threads to finish and sum their results - let sum = pool - .join_all() - .into_iter() - .map(|r| r.unwrap()) - .sum::(); + pool.join_all(); - assert_eq!(sum, 10); + assert_eq!(pool.try_get_results().iter().sum::(), 45); } #[test] - fn test_drop_stalled() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); + fn test_limit() { + let mut pool = ThreadPool::with_limit(2); - // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds - for i in 0..1 { - pool.enqueue(move || { - thread::sleep(Duration::from_secs(i * 20)); - i - }); + for i in 0..10 { + pool.enqueue(move || i); } - // wait 10 secs - thread::sleep(Duration::from_secs(2)); - // discard job that should still run - pool.discard_stalled(); + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); - // wait for the threads to finish and sum their results - let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); + pool.join_all(); - assert_eq!(sum, 0); + assert_eq!(pool.get_results().iter().sum::(), 45); + } + + #[test] + fn test_multiple() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(move || i); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); } }