From e48176707aa4dbf4e4a8f2eaed8f66776736f735 Mon Sep 17 00:00:00 2001 From: teridax Date: Tue, 23 May 2023 22:27:41 +0200 Subject: [PATCH 01/15] added multithreading crate with thread pool --- src/multithreading/mod.rs | 216 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 src/multithreading/mod.rs diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs new file mode 100644 index 0000000..0501061 --- /dev/null +++ b/src/multithreading/mod.rs @@ -0,0 +1,216 @@ +use std::{ + any::Any, + collections::VecDeque, + num::{NonZeroU32, NonZeroUsize}, + ops::{AddAssign, SubAssign}, + sync::{Arc, Mutex}, + thread::{self, JoinHandle}, +}; + +/// Maximum number of thread to be used by the thread pool in case all methods +/// of determining a recommend number failed +#[allow(unused)] +pub const FALLBACK_THREADS: usize = 1; + +/// Returns the number of threads to be used by the thread pool by default. +/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. +/// In case this fails [`FALLBACK_THREADS`] will be returned +fn get_default_thread_count() -> u32 { + // number of threads to fallback to + let fallback_threads = + NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); + // determine the maximum recommend number of threads to use + // most of the time this is gonna be the number of cpus + thread::available_parallelism() + .unwrap_or(fallback_threads) + .get() as u32 +} + +/// This struct manages a pool of threads with a fixed maximum number. +/// Any time a closure is passed to `enqueue` the pool checks whether it can +/// directly launch a new thread to execute the closure. If the maximum number +/// of threads is reached the closure is staged and will get executed by next +/// thread to be available. +/// The pool will also keep track of every `JoinHandle` created by running every closure on +/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. +/// # Example +/// ```rust +/// let mut pool = ThreadPool::new(); +/// +/// // launch some work in parallel +/// for i in 0..10 { +/// pool.enqueue(move || { +/// println!("I am multithreaded and have id: {i}"); +/// }); +/// } +/// // wait for threads to finish +/// pool.join_all(); +/// ``` +#[allow(dead_code)] +#[derive(Debug)] +pub struct ThreadPool +where + F: Send + FnOnce() -> T + Send + 'static, +{ + /// maximum number of threads to launch at once + max_thread_count: u32, + /// handles for launched threads + handles: Arc>>>, + /// function to be executed when threads are ready + queue: Arc>>, + /// number of currently running threads + threads: Arc>, +} + +impl Default for ThreadPool +where + F: Send + FnOnce() -> T + Send + 'static, +{ + fn default() -> Self { + Self { + max_thread_count: get_default_thread_count(), + handles: Default::default(), + queue: Default::default(), + // will be initialized to 0 + threads: Default::default(), + } + } +} + +#[allow(dead_code)] +impl ThreadPool +where + F: Send + FnOnce() -> T, + T: Send + 'static, +{ + /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads + /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. + /// # Limitations + /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. + /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. + pub fn new() -> Self { + Self { + max_thread_count: get_default_thread_count(), + ..Default::default() + } + } + + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + pub fn with_threads(max_thread_count: NonZeroU32) -> Self { + Self { + max_thread_count: max_thread_count.get(), + ..Default::default() + } + } + + /// Pass a new closure to be executed as soon as a thread is available. + /// This function will execute the supplied closure immediately when the number of running threads + /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time + /// in the future unless program doesn't die before. + /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled + /// closures be executed. + pub fn enqueue(&mut self, closure: F) { + // test if we can launch a new thread + if self.threads.lock().unwrap().to_owned() < self.max_thread_count { + // we can create a new thread, increment the thread count + self.threads.lock().unwrap().add_assign(1); + // run new thread + execute( + self.queue.clone(), + self.handles.clone(), + self.threads.clone(), + closure, + ); + } else { + // all threads being used + // enqueue closure to be launched when a thread is ready + self.queue.lock().unwrap().push_back(closure); + } + } + + /// Waits for all currently running threads and all stalled closures to be executed. + /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all + /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. + /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. + /// All handles of threads will be removed after this call. + pub fn join_all(&mut self) -> Vec>> { + let mut results = Vec::new(); + loop { + // lock the handles, pop the last one off and unlock handles again + // to allow running threads to process + let handle = self.handles.lock().unwrap().pop(); + + // if we still have a handle join it else no handles are left we abort the loop + if let Some(handle) = handle { + results.push(handle.join()); + continue; + } + break; + } + + results + } + + /// Returns the results of every thread that has already finished until now. + /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. + /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. + /// All handles of finished threads will be removed after this call. + pub fn get_finished(&mut self) -> Vec>> { + let mut results = Vec::new(); + + let mut handles = self.handles.lock().unwrap(); + + // loop through the handles and remove all finished handles + // join on the finished handles which will be quick as they are finished! + let mut idx = 0; + while idx < handles.len() { + if handles[idx].is_finished() { + // thread is finished, yield result + results.push(handles.remove(idx).join()); + } else { + // thread isn't done, continue to the next one + idx += 1; + } + } + + results + } +} + +/// Execute the supplied closure on a new thread +/// and store the threads handle into `handles`. When the thread +/// finished executing the closure it will look for any closures left in `queue` +/// recursively execute it on a new thread. This method updates threads` in order to +/// keep track of the number of active threads. +fn execute( + queue: Arc>>, + handles: Arc>>>, + threads: Arc>, + closure: F, +) where + T: Send + 'static, + F: Send + FnOnce() -> T + Send + 'static, +{ + let handles_copy = handles.clone(); + + handles.lock().unwrap().push(thread::spawn(move || { + // run closure (actual work) + let result = closure(); + + // take the next closure stalled for execution + let next = queue.lock().unwrap().pop_front(); + if let Some(next_closure) = next { + // if we have sth. to execute, spawn a new thread + execute(queue, handles_copy, threads, next_closure); + } else { + // nothing to execute this thread will run out without any work to do + // decrement the amount of used threads + threads.lock().unwrap().sub_assign(1); + } + + result + })); +} From cbf42d6d575d553636fb183f77343d9242961324 Mon Sep 17 00:00:00 2001 From: teridax Date: Tue, 23 May 2023 22:28:11 +0200 Subject: [PATCH 02/15] added crate mutlithreading --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 7d12d9a..a5704f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +mod multithreading; + pub fn add(left: usize, right: usize) -> usize { left + right } From 044a3f3747a96f61abc8b546496eddb2dcdedd0e Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 24 May 2023 12:11:48 +0200 Subject: [PATCH 03/15] replaced `threads` memeber from `Threadpool` mutex with atomic primitive --- src/multithreading/mod.rs | 55 ++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 0501061..4cf7e28 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,9 +1,11 @@ use std::{ any::Any, collections::VecDeque, - num::{NonZeroU32, NonZeroUsize}, - ops::{AddAssign, SubAssign}, - sync::{Arc, Mutex}, + num::NonZeroUsize, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, thread::{self, JoinHandle}, }; @@ -15,7 +17,7 @@ pub const FALLBACK_THREADS: usize = 1; /// Returns the number of threads to be used by the thread pool by default. /// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. /// In case this fails [`FALLBACK_THREADS`] will be returned -fn get_default_thread_count() -> u32 { +fn get_default_thread_count() -> usize { // number of threads to fallback to let fallback_threads = NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); @@ -23,7 +25,7 @@ fn get_default_thread_count() -> u32 { // most of the time this is gonna be the number of cpus thread::available_parallelism() .unwrap_or(fallback_threads) - .get() as u32 + .get() } /// This struct manages a pool of threads with a fixed maximum number. @@ -34,7 +36,7 @@ fn get_default_thread_count() -> u32 { /// The pool will also keep track of every `JoinHandle` created by running every closure on /// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. /// # Example -/// ```rust +/// ```rust ignore /// let mut pool = ThreadPool::new(); /// /// // launch some work in parallel @@ -46,6 +48,14 @@ fn get_default_thread_count() -> u32 { /// // wait for threads to finish /// pool.join_all(); /// ``` +/// # Portability +/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +/// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +/// Additionally this implementation relies on using the `load` and `store` operations +/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls +/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. #[allow(dead_code)] #[derive(Debug)] pub struct ThreadPool @@ -53,13 +63,18 @@ where F: Send + FnOnce() -> T + Send + 'static, { /// maximum number of threads to launch at once - max_thread_count: u32, + max_thread_count: usize, /// handles for launched threads handles: Arc>>>, /// function to be executed when threads are ready queue: Arc>>, /// number of currently running threads - threads: Arc>, + /// new implementation relies on atomic primitives to avoid locking and possible + /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" + /// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). + /// Also this implementation relies on using the `load` and `store` operations + /// instead of using more comfortable one like `fetch_add` + threads: Arc, } impl Default for ThreadPool @@ -99,7 +114,7 @@ where /// # Overusage /// supplying a number of threads to great may negatively impact performance as the system may not /// be able to full fill the required needs - pub fn with_threads(max_thread_count: NonZeroU32) -> Self { + pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { Self { max_thread_count: max_thread_count.get(), ..Default::default() @@ -113,10 +128,13 @@ where /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled /// closures be executed. pub fn enqueue(&mut self, closure: F) { + // read used thread counter and apply all store operations with Ordering::Release + let used_threads = self.threads.load(Ordering::Acquire); // test if we can launch a new thread - if self.threads.lock().unwrap().to_owned() < self.max_thread_count { + if used_threads < self.max_thread_count { // we can create a new thread, increment the thread count - self.threads.lock().unwrap().add_assign(1); + self.threads + .store(used_threads.saturating_add(1), Ordering::Release); // run new thread execute( self.queue.clone(), @@ -131,6 +149,14 @@ where } } + /// Removes all closures stalled for execution. + /// All closures still waiting to be executed will be dropped by the pool and + /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated + /// and resources are required immediately for updated closures. + pub fn discard_stalled(&mut self) { + self.queue.lock().unwrap().clear(); + } + /// Waits for all currently running threads and all stalled closures to be executed. /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. @@ -188,7 +214,7 @@ where fn execute( queue: Arc>>, handles: Arc>>>, - threads: Arc>, + threads: Arc, closure: F, ) where T: Send + 'static, @@ -208,7 +234,10 @@ fn execute( } else { // nothing to execute this thread will run out without any work to do // decrement the amount of used threads - threads.lock().unwrap().sub_assign(1); + threads.store( + threads.load(Ordering::Acquire).saturating_sub(1), + Ordering::Release, + ) } result From 43bd19643bd021881458f453857ea0588123329c Mon Sep 17 00:00:00 2001 From: teridax Date: Fri, 26 May 2023 10:40:13 +0200 Subject: [PATCH 04/15] fixed doctest for threadpool --- src/lib.rs | 2 +- src/multithreading/mod.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a5704f3..e0f9a41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -mod multithreading; +pub mod multithreading; pub fn add(left: usize, right: usize) -> usize { left + right diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 4cf7e28..cd899ae 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -36,7 +36,8 @@ fn get_default_thread_count() -> usize { /// The pool will also keep track of every `JoinHandle` created by running every closure on /// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. /// # Example -/// ```rust ignore +/// ```rust +/// use imsearch::multithreading::ThreadPool; /// let mut pool = ThreadPool::new(); /// /// // launch some work in parallel @@ -60,7 +61,7 @@ fn get_default_thread_count() -> usize { #[derive(Debug)] pub struct ThreadPool where - F: Send + FnOnce() -> T + Send + 'static, + F: Send + FnOnce() -> T, { /// maximum number of threads to launch at once max_thread_count: usize, @@ -79,7 +80,7 @@ where impl Default for ThreadPool where - F: Send + FnOnce() -> T + Send + 'static, + F: Send + FnOnce() -> T, { fn default() -> Self { Self { @@ -95,7 +96,7 @@ where #[allow(dead_code)] impl ThreadPool where - F: Send + FnOnce() -> T, + F: Send + FnOnce() -> T + 'static, T: Send + 'static, { /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads @@ -218,7 +219,7 @@ fn execute( closure: F, ) where T: Send + 'static, - F: Send + FnOnce() -> T + Send + 'static, + F: Send + FnOnce() -> T + 'static, { let handles_copy = handles.clone(); From 898d8785543abad087d830629a6a92d3fd6152e9 Mon Sep 17 00:00:00 2001 From: teridax Date: Fri, 26 May 2023 11:11:00 +0200 Subject: [PATCH 05/15] added module documentation to multithreading --- src/multithreading/mod.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index cd899ae..b832df0 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,3 +1,34 @@ +//! This module provides the functionality to create a thread pool of fixed capacity. +//! This means that the pool can be used to dispatch functions or closures that will be executed +//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether +//! threads are available. If so the pool will directly launch a new thread to run the supplied function. +//! In case no threads are available the job will be stalled for execution until a thread is free to run the first +//! stalled job. +//! +//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job +//! the pool still queries the result of the function which can be retrieved any time after the submission. +//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. +//! +//! # Threads +//! The maximum number of threads to be used can be specified when creating a new thread pool. +//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. +//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. +//! For further information see: [`thread::available_parallelism`] +//! +//! # Memory consumption over time +//! The pool will store the handle for every thread launched constantly increasing the memory consumption. +//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +//! +//! # Portability +//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +//! (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +//! Additionally this implementation relies on using the `load` and `store` operations +//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls +//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. + use std::{ any::Any, collections::VecDeque, @@ -209,7 +240,7 @@ where /// Execute the supplied closure on a new thread /// and store the threads handle into `handles`. When the thread -/// finished executing the closure it will look for any closures left in `queue` +/// finished executing the closure it will look for any closures left in `queue` and /// recursively execute it on a new thread. This method updates threads` in order to /// keep track of the number of active threads. fn execute( From 1208a04658e39a79ebd75e18a125b52062d50294 Mon Sep 17 00:00:00 2001 From: teridax Date: Tue, 30 May 2023 15:12:02 +0200 Subject: [PATCH 06/15] added functionality to drop thread handles automatically when threads have finished --- src/multithreading/mod.rs | 68 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index b832df0..5c3ed53 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -19,6 +19,8 @@ //! The pool will store the handle for every thread launched constantly increasing the memory consumption. //! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to //! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. //! //! # Portability //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. @@ -34,7 +36,7 @@ use std::{ collections::VecDeque, num::NonZeroUsize, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering, AtomicBool}, Arc, Mutex, }, thread::{self, JoinHandle}, @@ -88,6 +90,13 @@ fn get_default_thread_count() -> usize { /// Additionally this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +/// +/// # Memory consumption over time +/// The pool will store the handle for every thread launched constantly increasing the memory consumption. +/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. #[allow(dead_code)] #[derive(Debug)] pub struct ThreadPool @@ -107,6 +116,8 @@ where /// Also this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` threads: Arc, + /// wether to keep the thread handles after the function returned + drop_handles: Arc, } impl Default for ThreadPool @@ -119,7 +130,9 @@ where handles: Default::default(), queue: Default::default(), // will be initialized to 0 - threads: Default::default(), + threads: Arc::new(AtomicUsize::new(0)), + // do not drop handles by default + drop_handles: Arc::new(AtomicBool::new(false)) } } } @@ -153,6 +166,24 @@ where } } + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// and also sets the flag to drop the handles of finished threads instead of storing them until + /// eihter `join_all` or `get_finished` is called. + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + /// # Memory usage + /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of + /// launched threads. This causes memory consumption to rise over time as more and more + /// threads are launched. + pub fn with_threads_and_drop_handles(max_thread_count: NonZeroUsize, drop_handles: bool) -> Self { + Self { + max_thread_count: max_thread_count.get(), + drop_handles: Arc::new(AtomicBool::new(drop_handles)), + ..Default::default() + } + } + /// Pass a new closure to be executed as soon as a thread is available. /// This function will execute the supplied closure immediately when the number of running threads /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time @@ -172,6 +203,7 @@ where self.queue.clone(), self.handles.clone(), self.threads.clone(), + self.drop_handles.clone(), closure, ); } else { @@ -236,6 +268,29 @@ where results } + + /// set the flag to indicate that thread handles will be dropped after the thread is finished + /// executing. All threads that have finished until now but haven't been removed will get dropped + /// after the next thread finishes. + pub fn drop_finished_handles(&self) { + self.drop_handles.store(false, Ordering::Release); + } + + /// set the flag to indicate that thread handles will be kept after the thread is finished + /// executing until either `join_all` or `get_finished` is called. + /// Only new thread handles created after this call be kept. + pub fn keep_future_handles(&self) { + self.drop_handles.store(true, Ordering::Release); + } +} + +/// Removes all thread handles which have finished only if the can be locked at +/// the current time. This function will not block execution when the lock cannot be acquired. +fn try_prune(handles: Arc>>>) { + if let Ok(mut handles) = handles.try_lock() { + // keep unfinished elements + handles.retain(|handle| !handle.is_finished()); + } } /// Execute the supplied closure on a new thread @@ -247,6 +302,7 @@ fn execute( queue: Arc>>, handles: Arc>>>, threads: Arc, + drop: Arc, closure: F, ) where T: Send + 'static, @@ -262,7 +318,7 @@ fn execute( let next = queue.lock().unwrap().pop_front(); if let Some(next_closure) = next { // if we have sth. to execute, spawn a new thread - execute(queue, handles_copy, threads, next_closure); + execute(queue, handles_copy.clone(), threads, drop.clone(), next_closure); } else { // nothing to execute this thread will run out without any work to do // decrement the amount of used threads @@ -272,6 +328,12 @@ fn execute( ) } + // try to drop all fnished thread handles if necessary + // this is a non blocking operation + if drop.load(Ordering::Acquire) { + try_prune(handles_copy); + } + result })); } From 990a54a032c69d27398c1c5108d8c7638a050116 Mon Sep 17 00:00:00 2001 From: teridax Date: Tue, 30 May 2023 22:25:08 +0200 Subject: [PATCH 07/15] reformatted crate `multithreading` to pass tests --- src/multithreading/mod.rs | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 5c3ed53..71dabb6 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -4,24 +4,24 @@ //! threads are available. If so the pool will directly launch a new thread to run the supplied function. //! In case no threads are available the job will be stalled for execution until a thread is free to run the first //! stalled job. -//! +//! //! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job //! the pool still queries the result of the function which can be retrieved any time after the submission. //! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. -//! +//! //! # Threads //! The maximum number of threads to be used can be specified when creating a new thread pool. //! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. //! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. //! For further information see: [`thread::available_parallelism`] -//! +//! //! # Memory consumption over time //! The pool will store the handle for every thread launched constantly increasing the memory consumption. //! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to //! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. //! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread //! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -//! +//! //! # Portability //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. @@ -36,7 +36,7 @@ use std::{ collections::VecDeque, num::NonZeroUsize, sync::{ - atomic::{AtomicUsize, Ordering, AtomicBool}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, }, thread::{self, JoinHandle}, @@ -90,13 +90,13 @@ fn get_default_thread_count() -> usize { /// Additionally this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. -/// +/// /// # Memory consumption over time /// The pool will store the handle for every thread launched constantly increasing the memory consumption. /// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to /// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. /// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. +/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. #[allow(dead_code)] #[derive(Debug)] pub struct ThreadPool @@ -132,7 +132,7 @@ where // will be initialized to 0 threads: Arc::new(AtomicUsize::new(0)), // do not drop handles by default - drop_handles: Arc::new(AtomicBool::new(false)) + drop_handles: Arc::new(AtomicBool::new(false)), } } } @@ -173,10 +173,13 @@ where /// supplying a number of threads to great may negatively impact performance as the system may not /// be able to full fill the required needs /// # Memory usage - /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of + /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of /// launched threads. This causes memory consumption to rise over time as more and more /// threads are launched. - pub fn with_threads_and_drop_handles(max_thread_count: NonZeroUsize, drop_handles: bool) -> Self { + pub fn with_threads_and_drop_handles( + max_thread_count: NonZeroUsize, + drop_handles: bool, + ) -> Self { Self { max_thread_count: max_thread_count.get(), drop_handles: Arc::new(AtomicBool::new(drop_handles)), @@ -318,7 +321,13 @@ fn execute( let next = queue.lock().unwrap().pop_front(); if let Some(next_closure) = next { // if we have sth. to execute, spawn a new thread - execute(queue, handles_copy.clone(), threads, drop.clone(), next_closure); + execute( + queue, + handles_copy.clone(), + threads, + drop.clone(), + next_closure, + ); } else { // nothing to execute this thread will run out without any work to do // decrement the amount of used threads From c732864a741f2474e79d315cc3f5e835379051f2 Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 31 May 2023 09:00:49 +0200 Subject: [PATCH 08/15] added benchmark for `threadpool` using `criterion`. --- .gitignore | 1 + Cargo.toml | 7 ++++ benches/multithreading.rs | 74 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 benches/multithreading.rs diff --git a/.gitignore b/.gitignore index 4c790d0..98f2979 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock .DS_Store +/.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a6d8ad6..69cd0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"] png = "0.17.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "multithreading" +harness = false \ No newline at end of file diff --git a/benches/multithreading.rs b/benches/multithreading.rs new file mode 100644 index 0000000..ffd36d8 --- /dev/null +++ b/benches/multithreading.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use imsearch::multithreading::ThreadPool; + +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum +} + +fn bench_single_threaded(a: &Vec, b: &Vec) { + black_box(dot(a, b)); +} + +fn bench_threadpool(a: Arc>, b: Arc>) { + let mut pool = ThreadPool::new(); + + const CHUNKS: usize = 100; + + let steps = a.len() / CHUNKS; + + for i in 0..CHUNKS { + let chunk = i * steps; + let aa = a.clone(); + let bb = b.clone(); + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + pool.join_all() + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() +} + +fn create_vec(size: usize) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64)); + } + + Arc::new(vec) +} + +pub fn benchmark_threadpool(c: &mut Criterion) { + let vec_a = create_vec(1_000_000); + let vec_b = create_vec(1_000_000); + + c.bench_function("single threaded", |b| { + b.iter(|| bench_single_threaded(&vec_a, &vec_b)) + }); + c.bench_function("multi threaded", |b| { + b.iter(|| bench_threadpool(vec_a.clone(), vec_b.clone())) + }); +} + +criterion_group!(benches, benchmark_threadpool); +criterion_main!(benches); From e16a38aeefb5fb6ae04d6c59c7cb9f549bde7af8 Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 31 May 2023 17:09:44 +0200 Subject: [PATCH 09/15] finished benchmark for threadpool and fixed documentation for threadpool --- benches/multithreading.rs | 158 ++++++++++++++++++++++++++++++++------ src/multithreading/mod.rs | 8 +- 2 files changed, 139 insertions(+), 27 deletions(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index ffd36d8..84a6014 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -1,8 +1,28 @@ -use std::sync::Arc; +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::{num::NonZeroUsize, sync::Arc}; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use imsearch::multithreading::ThreadPool; +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. fn dot(a: &[f64], b: &[f64]) -> f64 { let mut sum = 0.0; @@ -13,21 +33,23 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { sum } -fn bench_single_threaded(a: &Vec, b: &Vec) { - black_box(dot(a, b)); -} +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + let mut pool = + ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); -fn bench_threadpool(a: Arc>, b: Arc>) { - let mut pool = ThreadPool::new(); + // number of elements in each vector for each thread + let steps = a.len() / threads; - const CHUNKS: usize = 100; - - let steps = a.len() / CHUNKS; - - for i in 0..CHUNKS { + for i in 0..threads { + // offset of the first element for the thread local vec let chunk = i * steps; + // create a new strong reference to the vector let aa = a.clone(); let bb = b.clone(); + // launch a new thread pool.enqueue(move || { let a = &aa[chunk..(chunk + steps)]; let b = &bb[chunk..(chunk + steps)]; @@ -36,39 +58,129 @@ fn bench_threadpool(a: Arc>, b: Arc>) { } black_box( + // wait for the threads to finish pool.join_all() + // iterate over the results and sum the parital dot products together .into_iter() .map(|r| r.unwrap()) .reduce(|a, b| a + b), ); } +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. #[inline] fn hash(x: f64) -> f64 { ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() } -fn create_vec(size: usize) -> Arc> { +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { let mut vec = Vec::with_capacity(size); for i in 0..size { - vec.push(hash(i as f64)); + vec.push(hash(i as f64 + seed as f64)); } Arc::new(vec) } -pub fn benchmark_threadpool(c: &mut Criterion) { - let vec_a = create_vec(1_000_000); - let vec_b = create_vec(1_000_000); +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - c.bench_function("single threaded", |b| { - b.iter(|| bench_single_threaded(&vec_a, &vec_b)) - }); - c.bench_function("multi threaded", |b| { - b.iter(|| bench_threadpool(vec_a.clone(), vec_b.clone())) + let mut group = c.benchmark_group("threadpool with various number of threads"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + // drop the handles used by each thread after its done + pool.drop_finished_handles(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); }); } -criterion_group!(benches, benchmark_threadpool); +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); criterion_main!(benches); diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 71dabb6..647ea48 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -26,7 +26,7 @@ //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. //! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -//! (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +//! (see: ). //! Additionally this implementation relies on using the `load` and `store` operations //! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls //! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. @@ -86,7 +86,7 @@ fn get_default_thread_count() -> usize { /// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. /// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. /// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -/// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +/// (see: ). /// Additionally this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. @@ -112,7 +112,7 @@ where /// number of currently running threads /// new implementation relies on atomic primitives to avoid locking and possible /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" - /// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). + /// (see: ). /// Also this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` threads: Arc, @@ -173,7 +173,7 @@ where /// supplying a number of threads to great may negatively impact performance as the system may not /// be able to full fill the required needs /// # Memory usage - /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of + /// if `drop_handles` is set to `false` the pool will continue to store the handles of /// launched threads. This causes memory consumption to rise over time as more and more /// threads are launched. pub fn with_threads_and_drop_handles( From 45fe1afcd93a66637cd872acac9c3a0ac08adf22 Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 31 May 2023 17:51:35 +0200 Subject: [PATCH 10/15] added unit test to `multithreading` --- src/multithreading/mod.rs | 51 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 647ea48..00ddc74 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -346,3 +346,54 @@ fn execute( result })); } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_thread_pool() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::new(); + + // launch 4 jobs to run on our pool + for i in 0..4 { + pool.enqueue(move || (0..=i).sum::()); + } + + // wait for the threads to finish and sum their results + let sum = pool + .join_all() + .into_iter() + .map(|r| r.unwrap()) + .sum::(); + + assert_eq!(sum, 10); + } + + #[test] + fn test_drop_stalled() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); + + // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds + for i in 0..1 { + pool.enqueue(move || { + thread::sleep(Duration::from_secs(i * 20)); + i + }); + } + + // wait 10 secs + thread::sleep(Duration::from_secs(2)); + // discard job that should still run + pool.discard_stalled(); + + // wait for the threads to finish and sum their results + let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); + + assert_eq!(sum, 0); + } +} From 10266dd1bc3f76f0e29678e931b320b04b30a85d Mon Sep 17 00:00:00 2001 From: teridax <72654954+Servostar@users.noreply.github.com> Date: Thu, 1 Jun 2023 20:45:07 +0200 Subject: [PATCH 11/15] Multithreading (#12) * added multithreading crate with thread pool * added crate mutlithreading * replaced `threads` memeber from `Threadpool` mutex with atomic primitive * fixed doctest for threadpool * added module documentation to multithreading * added functionality to drop thread handles automatically when threads have finished * reformatted crate `multithreading` to pass tests * added benchmark for `threadpool` using `criterion`. * finished benchmark for threadpool and fixed documentation for threadpool * added unit test to `multithreading` --- .gitignore | 1 + Cargo.toml | 7 + benches/multithreading.rs | 186 ++++++++++++++++++ src/lib.rs | 2 + src/multithreading/mod.rs | 399 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 595 insertions(+) create mode 100644 benches/multithreading.rs create mode 100644 src/multithreading/mod.rs diff --git a/.gitignore b/.gitignore index 4c790d0..98f2979 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock .DS_Store +/.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a6d8ad6..69cd0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"] png = "0.17.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "multithreading" +harness = false \ No newline at end of file diff --git a/benches/multithreading.rs b/benches/multithreading.rs new file mode 100644 index 0000000..84a6014 --- /dev/null +++ b/benches/multithreading.rs @@ -0,0 +1,186 @@ +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. + +use std::{num::NonZeroUsize, sync::Arc}; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use imsearch::multithreading::ThreadPool; + +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum +} + +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + let mut pool = + ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() +} + +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64 + seed as f64)); + } + + Arc::new(vec) +} + +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool with various number of threads"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + // drop the handles used by each thread after its done + pool.drop_finished_handles(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); + }); +} + +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 7d12d9a..e0f9a41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +pub mod multithreading; + pub fn add(left: usize, right: usize) -> usize { left + right } diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs new file mode 100644 index 0000000..00ddc74 --- /dev/null +++ b/src/multithreading/mod.rs @@ -0,0 +1,399 @@ +//! This module provides the functionality to create a thread pool of fixed capacity. +//! This means that the pool can be used to dispatch functions or closures that will be executed +//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether +//! threads are available. If so the pool will directly launch a new thread to run the supplied function. +//! In case no threads are available the job will be stalled for execution until a thread is free to run the first +//! stalled job. +//! +//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job +//! the pool still queries the result of the function which can be retrieved any time after the submission. +//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. +//! +//! # Threads +//! The maximum number of threads to be used can be specified when creating a new thread pool. +//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. +//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. +//! For further information see: [`thread::available_parallelism`] +//! +//! # Memory consumption over time +//! The pool will store the handle for every thread launched constantly increasing the memory consumption. +//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. +//! +//! # Portability +//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +//! (see: ). +//! Additionally this implementation relies on using the `load` and `store` operations +//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls +//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. + +use std::{ + any::Any, + collections::VecDeque, + num::NonZeroUsize, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, +}; + +/// Maximum number of thread to be used by the thread pool in case all methods +/// of determining a recommend number failed +#[allow(unused)] +pub const FALLBACK_THREADS: usize = 1; + +/// Returns the number of threads to be used by the thread pool by default. +/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. +/// In case this fails [`FALLBACK_THREADS`] will be returned +fn get_default_thread_count() -> usize { + // number of threads to fallback to + let fallback_threads = + NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); + // determine the maximum recommend number of threads to use + // most of the time this is gonna be the number of cpus + thread::available_parallelism() + .unwrap_or(fallback_threads) + .get() +} + +/// This struct manages a pool of threads with a fixed maximum number. +/// Any time a closure is passed to `enqueue` the pool checks whether it can +/// directly launch a new thread to execute the closure. If the maximum number +/// of threads is reached the closure is staged and will get executed by next +/// thread to be available. +/// The pool will also keep track of every `JoinHandle` created by running every closure on +/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. +/// # Example +/// ```rust +/// use imsearch::multithreading::ThreadPool; +/// let mut pool = ThreadPool::new(); +/// +/// // launch some work in parallel +/// for i in 0..10 { +/// pool.enqueue(move || { +/// println!("I am multithreaded and have id: {i}"); +/// }); +/// } +/// // wait for threads to finish +/// pool.join_all(); +/// ``` +/// # Portability +/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +/// (see: ). +/// Additionally this implementation relies on using the `load` and `store` operations +/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls +/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +/// +/// # Memory consumption over time +/// The pool will store the handle for every thread launched constantly increasing the memory consumption. +/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. +#[allow(dead_code)] +#[derive(Debug)] +pub struct ThreadPool +where + F: Send + FnOnce() -> T, +{ + /// maximum number of threads to launch at once + max_thread_count: usize, + /// handles for launched threads + handles: Arc>>>, + /// function to be executed when threads are ready + queue: Arc>>, + /// number of currently running threads + /// new implementation relies on atomic primitives to avoid locking and possible + /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" + /// (see: ). + /// Also this implementation relies on using the `load` and `store` operations + /// instead of using more comfortable one like `fetch_add` + threads: Arc, + /// wether to keep the thread handles after the function returned + drop_handles: Arc, +} + +impl Default for ThreadPool +where + F: Send + FnOnce() -> T, +{ + fn default() -> Self { + Self { + max_thread_count: get_default_thread_count(), + handles: Default::default(), + queue: Default::default(), + // will be initialized to 0 + threads: Arc::new(AtomicUsize::new(0)), + // do not drop handles by default + drop_handles: Arc::new(AtomicBool::new(false)), + } + } +} + +#[allow(dead_code)] +impl ThreadPool +where + F: Send + FnOnce() -> T + 'static, + T: Send + 'static, +{ + /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads + /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. + /// # Limitations + /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. + /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. + pub fn new() -> Self { + Self { + max_thread_count: get_default_thread_count(), + ..Default::default() + } + } + + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { + Self { + max_thread_count: max_thread_count.get(), + ..Default::default() + } + } + + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// and also sets the flag to drop the handles of finished threads instead of storing them until + /// eihter `join_all` or `get_finished` is called. + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + /// # Memory usage + /// if `drop_handles` is set to `false` the pool will continue to store the handles of + /// launched threads. This causes memory consumption to rise over time as more and more + /// threads are launched. + pub fn with_threads_and_drop_handles( + max_thread_count: NonZeroUsize, + drop_handles: bool, + ) -> Self { + Self { + max_thread_count: max_thread_count.get(), + drop_handles: Arc::new(AtomicBool::new(drop_handles)), + ..Default::default() + } + } + + /// Pass a new closure to be executed as soon as a thread is available. + /// This function will execute the supplied closure immediately when the number of running threads + /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time + /// in the future unless program doesn't die before. + /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled + /// closures be executed. + pub fn enqueue(&mut self, closure: F) { + // read used thread counter and apply all store operations with Ordering::Release + let used_threads = self.threads.load(Ordering::Acquire); + // test if we can launch a new thread + if used_threads < self.max_thread_count { + // we can create a new thread, increment the thread count + self.threads + .store(used_threads.saturating_add(1), Ordering::Release); + // run new thread + execute( + self.queue.clone(), + self.handles.clone(), + self.threads.clone(), + self.drop_handles.clone(), + closure, + ); + } else { + // all threads being used + // enqueue closure to be launched when a thread is ready + self.queue.lock().unwrap().push_back(closure); + } + } + + /// Removes all closures stalled for execution. + /// All closures still waiting to be executed will be dropped by the pool and + /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated + /// and resources are required immediately for updated closures. + pub fn discard_stalled(&mut self) { + self.queue.lock().unwrap().clear(); + } + + /// Waits for all currently running threads and all stalled closures to be executed. + /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all + /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. + /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. + /// All handles of threads will be removed after this call. + pub fn join_all(&mut self) -> Vec>> { + let mut results = Vec::new(); + loop { + // lock the handles, pop the last one off and unlock handles again + // to allow running threads to process + let handle = self.handles.lock().unwrap().pop(); + + // if we still have a handle join it else no handles are left we abort the loop + if let Some(handle) = handle { + results.push(handle.join()); + continue; + } + break; + } + + results + } + + /// Returns the results of every thread that has already finished until now. + /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. + /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. + /// All handles of finished threads will be removed after this call. + pub fn get_finished(&mut self) -> Vec>> { + let mut results = Vec::new(); + + let mut handles = self.handles.lock().unwrap(); + + // loop through the handles and remove all finished handles + // join on the finished handles which will be quick as they are finished! + let mut idx = 0; + while idx < handles.len() { + if handles[idx].is_finished() { + // thread is finished, yield result + results.push(handles.remove(idx).join()); + } else { + // thread isn't done, continue to the next one + idx += 1; + } + } + + results + } + + /// set the flag to indicate that thread handles will be dropped after the thread is finished + /// executing. All threads that have finished until now but haven't been removed will get dropped + /// after the next thread finishes. + pub fn drop_finished_handles(&self) { + self.drop_handles.store(false, Ordering::Release); + } + + /// set the flag to indicate that thread handles will be kept after the thread is finished + /// executing until either `join_all` or `get_finished` is called. + /// Only new thread handles created after this call be kept. + pub fn keep_future_handles(&self) { + self.drop_handles.store(true, Ordering::Release); + } +} + +/// Removes all thread handles which have finished only if the can be locked at +/// the current time. This function will not block execution when the lock cannot be acquired. +fn try_prune(handles: Arc>>>) { + if let Ok(mut handles) = handles.try_lock() { + // keep unfinished elements + handles.retain(|handle| !handle.is_finished()); + } +} + +/// Execute the supplied closure on a new thread +/// and store the threads handle into `handles`. When the thread +/// finished executing the closure it will look for any closures left in `queue` and +/// recursively execute it on a new thread. This method updates threads` in order to +/// keep track of the number of active threads. +fn execute( + queue: Arc>>, + handles: Arc>>>, + threads: Arc, + drop: Arc, + closure: F, +) where + T: Send + 'static, + F: Send + FnOnce() -> T + 'static, +{ + let handles_copy = handles.clone(); + + handles.lock().unwrap().push(thread::spawn(move || { + // run closure (actual work) + let result = closure(); + + // take the next closure stalled for execution + let next = queue.lock().unwrap().pop_front(); + if let Some(next_closure) = next { + // if we have sth. to execute, spawn a new thread + execute( + queue, + handles_copy.clone(), + threads, + drop.clone(), + next_closure, + ); + } else { + // nothing to execute this thread will run out without any work to do + // decrement the amount of used threads + threads.store( + threads.load(Ordering::Acquire).saturating_sub(1), + Ordering::Release, + ) + } + + // try to drop all fnished thread handles if necessary + // this is a non blocking operation + if drop.load(Ordering::Acquire) { + try_prune(handles_copy); + } + + result + })); +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_thread_pool() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::new(); + + // launch 4 jobs to run on our pool + for i in 0..4 { + pool.enqueue(move || (0..=i).sum::()); + } + + // wait for the threads to finish and sum their results + let sum = pool + .join_all() + .into_iter() + .map(|r| r.unwrap()) + .sum::(); + + assert_eq!(sum, 10); + } + + #[test] + fn test_drop_stalled() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); + + // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds + for i in 0..1 { + pool.enqueue(move || { + thread::sleep(Duration::from_secs(i * 20)); + i + }); + } + + // wait 10 secs + thread::sleep(Duration::from_secs(2)); + // discard job that should still run + pool.discard_stalled(); + + // wait for the threads to finish and sum their results + let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); + + assert_eq!(sum, 0); + } +} From 125a3964a76fda551110df9301c5a415adedaad3 Mon Sep 17 00:00:00 2001 From: teridax Date: Sun, 4 Jun 2023 22:31:00 +0200 Subject: [PATCH 12/15] complete rewrite of `multithreading::ThreadPool` removing: - The limitation of Atomics - Multiple Mutexes And added message passing --- benches/multithreading.rs | 26 +-- src/multithreading/mod.rs | 436 ++++++-------------------------------- 2 files changed, 70 insertions(+), 392 deletions(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 84a6014..b7fdb2f 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -38,7 +38,7 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { /// finished the partial dot products will be summed to create the final result. fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { let mut pool = - ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); + ThreadPool::with_limit(threads); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -56,15 +56,10 @@ fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { dot(a, b) }); } + + pool.join_all(); - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + black_box(pool.get_results().iter().sum::()); } /// Compute a simple hash value for the given index value. @@ -114,8 +109,6 @@ pub fn bench_threadpool(c: &mut Criterion) { fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { // automatically choose the number of threads let mut pool = ThreadPool::new(); - // drop the handles used by each thread after its done - pool.drop_finished_handles(); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -134,14 +127,9 @@ fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { }); } - black_box( - // wait for the threads to finish - pool.join_all() - // iterate over the results and sum the parital dot products together - .into_iter() - .map(|r| r.unwrap()) - .reduce(|a, b| a + b), - ); + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); } /// Benchmark the effects of over and underusing a thread pools thread capacity. diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 00ddc74..e7cf0fe 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,399 +1,89 @@ -//! This module provides the functionality to create a thread pool of fixed capacity. -//! This means that the pool can be used to dispatch functions or closures that will be executed -//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether -//! threads are available. If so the pool will directly launch a new thread to run the supplied function. -//! In case no threads are available the job will be stalled for execution until a thread is free to run the first -//! stalled job. -//! -//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job -//! the pool still queries the result of the function which can be retrieved any time after the submission. -//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. -//! -//! # Threads -//! The maximum number of threads to be used can be specified when creating a new thread pool. -//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. -//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. -//! For further information see: [`thread::available_parallelism`] -//! -//! # Memory consumption over time -//! The pool will store the handle for every thread launched constantly increasing the memory consumption. -//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -//! -//! # Portability -//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -//! (see: ). -//! Additionally this implementation relies on using the `load` and `store` operations -//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls -//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +use std::{thread::{JoinHandle, self}, sync::{mpsc::{Receiver, channel, Sender}, Mutex, Arc}, num::NonZeroUsize, collections::VecDeque}; -use std::{ - any::Any, - collections::VecDeque, - num::NonZeroUsize, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, - }, - thread::{self, JoinHandle}, -}; +const DEFAULT_THREAD_POOL_SIZE: usize = 1; -/// Maximum number of thread to be used by the thread pool in case all methods -/// of determining a recommend number failed -#[allow(unused)] -pub const FALLBACK_THREADS: usize = 1; - -/// Returns the number of threads to be used by the thread pool by default. -/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. -/// In case this fails [`FALLBACK_THREADS`] will be returned -fn get_default_thread_count() -> usize { - // number of threads to fallback to - let fallback_threads = - NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); - // determine the maximum recommend number of threads to use - // most of the time this is gonna be the number of cpus - thread::available_parallelism() - .unwrap_or(fallback_threads) - .get() -} - -/// This struct manages a pool of threads with a fixed maximum number. -/// Any time a closure is passed to `enqueue` the pool checks whether it can -/// directly launch a new thread to execute the closure. If the maximum number -/// of threads is reached the closure is staged and will get executed by next -/// thread to be available. -/// The pool will also keep track of every `JoinHandle` created by running every closure on -/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. -/// # Example -/// ```rust -/// use imsearch::multithreading::ThreadPool; -/// let mut pool = ThreadPool::new(); -/// -/// // launch some work in parallel -/// for i in 0..10 { -/// pool.enqueue(move || { -/// println!("I am multithreaded and have id: {i}"); -/// }); -/// } -/// // wait for threads to finish -/// pool.join_all(); -/// ``` -/// # Portability -/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. -/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. -/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -/// (see: ). -/// Additionally this implementation relies on using the `load` and `store` operations -/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls -/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. -/// -/// # Memory consumption over time -/// The pool will store the handle for every thread launched constantly increasing the memory consumption. -/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to -/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. -/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread -/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. -#[allow(dead_code)] -#[derive(Debug)] -pub struct ThreadPool -where - F: Send + FnOnce() -> T, +pub trait Job: Send + 'static + FnOnce() -> T +where T: Send { - /// maximum number of threads to launch at once - max_thread_count: usize, - /// handles for launched threads - handles: Arc>>>, - /// function to be executed when threads are ready - queue: Arc>>, - /// number of currently running threads - /// new implementation relies on atomic primitives to avoid locking and possible - /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" - /// (see: ). - /// Also this implementation relies on using the `load` and `store` operations - /// instead of using more comfortable one like `fetch_add` - threads: Arc, - /// wether to keep the thread handles after the function returned - drop_handles: Arc, } -impl Default for ThreadPool -where - F: Send + FnOnce() -> T, +impl Job for U + where U: Send + 'static + FnOnce() -> T, T: Send + 'static +{ +} + +#[derive(Debug)] +pub struct ThreadPool + where T: Send, F: Job +{ + queue: Arc>>, + handles: Vec>, + receiver: Receiver, + sender: Sender, + limit: NonZeroUsize, +} + +impl Default for ThreadPool +where T: Send + 'static, F: Job { fn default() -> Self { + let (sender, receiver) = channel::(); + + let default = NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + let limit = thread::available_parallelism().unwrap_or(default); + Self { - max_thread_count: get_default_thread_count(), - handles: Default::default(), - queue: Default::default(), - // will be initialized to 0 - threads: Arc::new(AtomicUsize::new(0)), - // do not drop handles by default - drop_handles: Arc::new(AtomicBool::new(false)), + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + receiver, + sender, + limit, } } } -#[allow(dead_code)] -impl ThreadPool -where - F: Send + FnOnce() -> T + 'static, - T: Send + 'static, +impl ThreadPool +where T: Send + 'static, F: Job { - /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads - /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. - /// # Limitations - /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. - /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. pub fn new() -> Self { + Default::default() + } + + pub fn with_limit(max_threads: usize) -> Self { Self { - max_thread_count: get_default_thread_count(), + limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), ..Default::default() } } - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { - Self { - max_thread_count: max_thread_count.get(), - ..Default::default() - } - } + pub fn enqueue(&mut self, func: F) { + if self.handles.len() < self.limit.get() { + // we can still launch threads to run in parallel - /// Create a new empty thread pool with the maximum number of threads set be the specified number - /// and also sets the flag to drop the handles of finished threads instead of storing them until - /// eihter `join_all` or `get_finished` is called. - /// # Overusage - /// supplying a number of threads to great may negatively impact performance as the system may not - /// be able to full fill the required needs - /// # Memory usage - /// if `drop_handles` is set to `false` the pool will continue to store the handles of - /// launched threads. This causes memory consumption to rise over time as more and more - /// threads are launched. - pub fn with_threads_and_drop_handles( - max_thread_count: NonZeroUsize, - drop_handles: bool, - ) -> Self { - Self { - max_thread_count: max_thread_count.get(), - drop_handles: Arc::new(AtomicBool::new(drop_handles)), - ..Default::default() - } - } + // clone the sender + let tx = self.sender.clone(); + let queue = self.queue.clone(); + + self.handles.push(thread::spawn(move || { + while let Some(job) = queue.lock().unwrap().pop_front() { + tx.send(job()).expect("cannot send result"); + } + })); - /// Pass a new closure to be executed as soon as a thread is available. - /// This function will execute the supplied closure immediately when the number of running threads - /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time - /// in the future unless program doesn't die before. - /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled - /// closures be executed. - pub fn enqueue(&mut self, closure: F) { - // read used thread counter and apply all store operations with Ordering::Release - let used_threads = self.threads.load(Ordering::Acquire); - // test if we can launch a new thread - if used_threads < self.max_thread_count { - // we can create a new thread, increment the thread count - self.threads - .store(used_threads.saturating_add(1), Ordering::Release); - // run new thread - execute( - self.queue.clone(), - self.handles.clone(), - self.threads.clone(), - self.drop_handles.clone(), - closure, - ); } else { - // all threads being used - // enqueue closure to be launched when a thread is ready - self.queue.lock().unwrap().push_back(closure); + self.queue.lock().unwrap().push_back(func); + } + + self.handles.retain(|h| !h.is_finished()); + } + + pub fn join_all(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); } } - /// Removes all closures stalled for execution. - /// All closures still waiting to be executed will be dropped by the pool and - /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated - /// and resources are required immediately for updated closures. - pub fn discard_stalled(&mut self) { - self.queue.lock().unwrap().clear(); + pub fn get_results(&mut self) -> Vec { + self.receiver.try_iter().collect() } - - /// Waits for all currently running threads and all stalled closures to be executed. - /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all - /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. - /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. - /// All handles of threads will be removed after this call. - pub fn join_all(&mut self) -> Vec>> { - let mut results = Vec::new(); - loop { - // lock the handles, pop the last one off and unlock handles again - // to allow running threads to process - let handle = self.handles.lock().unwrap().pop(); - - // if we still have a handle join it else no handles are left we abort the loop - if let Some(handle) = handle { - results.push(handle.join()); - continue; - } - break; - } - - results - } - - /// Returns the results of every thread that has already finished until now. - /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. - /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. - /// All handles of finished threads will be removed after this call. - pub fn get_finished(&mut self) -> Vec>> { - let mut results = Vec::new(); - - let mut handles = self.handles.lock().unwrap(); - - // loop through the handles and remove all finished handles - // join on the finished handles which will be quick as they are finished! - let mut idx = 0; - while idx < handles.len() { - if handles[idx].is_finished() { - // thread is finished, yield result - results.push(handles.remove(idx).join()); - } else { - // thread isn't done, continue to the next one - idx += 1; - } - } - - results - } - - /// set the flag to indicate that thread handles will be dropped after the thread is finished - /// executing. All threads that have finished until now but haven't been removed will get dropped - /// after the next thread finishes. - pub fn drop_finished_handles(&self) { - self.drop_handles.store(false, Ordering::Release); - } - - /// set the flag to indicate that thread handles will be kept after the thread is finished - /// executing until either `join_all` or `get_finished` is called. - /// Only new thread handles created after this call be kept. - pub fn keep_future_handles(&self) { - self.drop_handles.store(true, Ordering::Release); - } -} - -/// Removes all thread handles which have finished only if the can be locked at -/// the current time. This function will not block execution when the lock cannot be acquired. -fn try_prune(handles: Arc>>>) { - if let Ok(mut handles) = handles.try_lock() { - // keep unfinished elements - handles.retain(|handle| !handle.is_finished()); - } -} - -/// Execute the supplied closure on a new thread -/// and store the threads handle into `handles`. When the thread -/// finished executing the closure it will look for any closures left in `queue` and -/// recursively execute it on a new thread. This method updates threads` in order to -/// keep track of the number of active threads. -fn execute( - queue: Arc>>, - handles: Arc>>>, - threads: Arc, - drop: Arc, - closure: F, -) where - T: Send + 'static, - F: Send + FnOnce() -> T + 'static, -{ - let handles_copy = handles.clone(); - - handles.lock().unwrap().push(thread::spawn(move || { - // run closure (actual work) - let result = closure(); - - // take the next closure stalled for execution - let next = queue.lock().unwrap().pop_front(); - if let Some(next_closure) = next { - // if we have sth. to execute, spawn a new thread - execute( - queue, - handles_copy.clone(), - threads, - drop.clone(), - next_closure, - ); - } else { - // nothing to execute this thread will run out without any work to do - // decrement the amount of used threads - threads.store( - threads.load(Ordering::Acquire).saturating_sub(1), - Ordering::Release, - ) - } - - // try to drop all fnished thread handles if necessary - // this is a non blocking operation - if drop.load(Ordering::Acquire) { - try_prune(handles_copy); - } - - result - })); -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - #[test] - fn test_thread_pool() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::new(); - - // launch 4 jobs to run on our pool - for i in 0..4 { - pool.enqueue(move || (0..=i).sum::()); - } - - // wait for the threads to finish and sum their results - let sum = pool - .join_all() - .into_iter() - .map(|r| r.unwrap()) - .sum::(); - - assert_eq!(sum, 10); - } - - #[test] - fn test_drop_stalled() { - // auto determine the amount of threads to use - let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); - - // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds - for i in 0..1 { - pool.enqueue(move || { - thread::sleep(Duration::from_secs(i * 20)); - i - }); - } - - // wait 10 secs - thread::sleep(Duration::from_secs(2)); - // discard job that should still run - pool.discard_stalled(); - - // wait for the threads to finish and sum their results - let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); - - assert_eq!(sum, 0); - } -} +} \ No newline at end of file From 9be7bc18c7f22a6d21ff2015a065baa8c6eb0d5a Mon Sep 17 00:00:00 2001 From: teridax Date: Sun, 4 Jun 2023 23:11:36 +0200 Subject: [PATCH 13/15] fixed imports --- benches/multithreading.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index b7fdb2f..c013266 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -4,7 +4,7 @@ //! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit //! double precision floating point values. -use std::{num::NonZeroUsize, sync::Arc}; +use std::{sync::Arc}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use imsearch::multithreading::ThreadPool; From 334093ad877f6aae65c9474a91003f67a30e4332 Mon Sep 17 00:00:00 2001 From: teridax <72654954+Servostar@users.noreply.github.com> Date: Mon, 5 Jun 2023 17:40:00 +0000 Subject: [PATCH 14/15] Update README.md Added reminder for good programming practices for this repo (tb removed later on) --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 90d7ca9..f05ce35 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ # Programmentwurf Die Beschreibung der Aufgabenstellung ist unter [Programmentwurf.md](https://github.com/programmieren-mit-rust/programmentwurf/blob/main/Programmentwurf.md) zu finden. Diese `Readme.md` ist durch etwas Sinnvolles zu ersetzen. + +# WICHTIG! +Kleiner reminder, wenn ihr Sachen pushed in das repo, die eurer Anischt nach fertig sind (z.B für einen Pull-Request!), bitte mit den folgenden Commands auf Fehler/Warnings überprüfen: +- `cargo fmt` für formattierung +- `cargo clippy` für warnings +- `cargo test doc` für documentation tests +optional: +- `cargo test` für module tests +- `cargo bench` für benchmarks From 7a6dc389b9ff99385875e7bee894896b2909a605 Mon Sep 17 00:00:00 2001 From: teridax Date: Tue, 6 Jun 2023 17:56:34 +0200 Subject: [PATCH 15/15] finished documetation for thread pool --- benches/multithreading.rs | 7 +- src/multithreading/mod.rs | 206 +++++++++++++++++++++++++++++++++++--- 2 files changed, 194 insertions(+), 19 deletions(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index c013266..4f9c642 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -4,7 +4,7 @@ //! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit //! double precision floating point values. -use std::{sync::Arc}; +use std::sync::Arc; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use imsearch::multithreading::ThreadPool; @@ -37,8 +37,7 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { /// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have /// finished the partial dot products will be summed to create the final result. fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { - let mut pool = - ThreadPool::with_limit(threads); + let mut pool = ThreadPool::with_limit(threads); // number of elements in each vector for each thread let steps = a.len() / threads; @@ -56,7 +55,7 @@ fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { dot(a, b) }); } - + pool.join_all(); black_box(pool.get_results().iter().sum::()); diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index e7cf0fe..d20347a 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -1,35 +1,114 @@ -use std::{thread::{JoinHandle, self}, sync::{mpsc::{Receiver, channel, Sender}, Mutex, Arc}, num::NonZeroUsize, collections::VecDeque}; +//! This module provides the functionality to create thread pool to execute tasks in parallel. +//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +//! a new thread will be launched until the maximum number is reached. By then every launched thread will +//! be reused to process the remaining elements of the queue. If no jobs are left to be executed +//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +//! # Example +//! ```rust +//! # use imsearch::multithreading::ThreadPool; +//! let mut pool = ThreadPool::with_limit(2); +//! +//! for i in 0..10 { +//! pool.enqueue(move || i); +//! } +//! +//! pool.join_all(); +//! assert_eq!(pool.get_results().iter().sum::(), 45); +//! ``` -const DEFAULT_THREAD_POOL_SIZE: usize = 1; +use std::{ + collections::VecDeque, + num::NonZeroUsize, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, +}; +/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails. +pub const DEFAULT_THREAD_POOL_SIZE: usize = 1; + +/// Indicates the priority level of functions or closures which get supplied to the pool. +/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied +/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum Priority { + /// Indicate that the closure or function supplied to the thread + /// has higher priority than any other given to the pool until now. + /// The item will get enqueued at the start of the waiting-queue. + High, + /// Indicate that the closure or function supplied to the thread pool + /// has lower priority than the already supplied ones in this pool. + /// The item will get enqueued at the end of the waiting-queue. + Low, +} + +/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are +/// free to execute them directly. They are meant to be executed only once and be done. pub trait Job: Send + 'static + FnOnce() -> T -where T: Send +where + T: Send, { } impl Job for U - where U: Send + 'static + FnOnce() -> T, T: Send + 'static +where + U: Send + 'static + FnOnce() -> T, + T: Send + 'static, { } +/// Thread pool which can be used to execute functions or closures in parallel. +/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +/// a new thread will be launched until the maximum number is reached. By then every launched thread will +/// be reused to process the remaining elements of the queue. If no jobs are left to be executed +/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +/// # Example +/// ```rust +/// # use imsearch::multithreading::ThreadPool; +/// let mut pool = ThreadPool::with_limit(2); +/// +/// for i in 0..10 { +/// pool.enqueue(move || i); +/// } +/// +/// pool.join_all(); +/// assert_eq!(pool.get_results().iter().sum::(), 45); +/// ``` #[derive(Debug)] -pub struct ThreadPool - where T: Send, F: Job +pub struct ThreadPool +where + T: Send, + F: Job, { + /// queue for storing the jobs to be executed queue: Arc>>, + /// handles for all threads currently running and processing jobs handles: Vec>, + /// reciver end for channel based communication between threads receiver: Receiver, + /// sender end for channel based communication between threads sender: Sender, + /// maximum amount of threads to be used in parallel limit: NonZeroUsize, } impl Default for ThreadPool -where T: Send + 'static, F: Job +where + T: Send + 'static, + F: Job, { fn default() -> Self { let (sender, receiver) = channel::(); - let default = NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + // determine default thread count to use based on the system + let default = + NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); let limit = thread::available_parallelism().unwrap_or(default); Self { @@ -43,12 +122,22 @@ where T: Send + 'static, F: Job } impl ThreadPool -where T: Send + 'static, F: Job +where + T: Send + 'static, + F: Job, { + /// Creates a new thread pool with default thread count determined by either + /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. + /// No threads will be lauched until jobs are enqueued. pub fn new() -> Self { Default::default() } + /// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if + /// the system does not allow for that count of parallelism. + /// No threads will be lauched until jobs are enqueued. + /// # Panic + /// This function will fails if `max_threads` is zero. pub fn with_limit(max_threads: usize) -> Self { Self { limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), @@ -56,7 +145,22 @@ where T: Send + 'static, F: Job } } - pub fn enqueue(&mut self, func: F) { + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job will determine if the job will be put at the start or end of the queue. + /// See [`crate::multithreading::Priority`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue_priorize(&mut self, func: F, priority: Priority) { + // put job into queue + let mut queue = self.queue.lock().unwrap(); + + // insert new job into queue depending on its priority + match priority { + Priority::High => queue.push_front(func), + Priority::Low => queue.push_back(func), + } + if self.handles.len() < self.limit.get() { // we can still launch threads to run in parallel @@ -69,21 +173,93 @@ where T: Send + 'static, F: Job tx.send(job()).expect("cannot send result"); } })); - - } else { - self.queue.lock().unwrap().push_back(func); } self.handles.retain(|h| !h.is_finished()); } + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue(&mut self, func: F) { + self.enqueue_priorize(func, Priority::Low); + } + + /// Wait for all threads to finish executing. This means that by the time all threads have finished + /// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty. + /// This function will block the caller thread. pub fn join_all(&mut self) { while let Some(handle) = self.handles.pop() { handle.join().unwrap(); } } - pub fn get_results(&mut self) -> Vec { + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. + /// All results retrieved from this call won't be returned on a second call. + /// This function is non blocking. + pub fn try_get_results(&mut self) -> Vec { self.receiver.try_iter().collect() } -} \ No newline at end of file + + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). + /// All results retrieved from this call won't be returned on a second call. + /// This function will block the caller thread. + pub fn get_results(&mut self) -> Vec { + self.join_all(); + self.try_get_results() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_default() { + let mut pool = ThreadPool::default(); + + for i in 0..10 { + pool.enqueue_priorize(move || i, Priority::High); + } + + pool.join_all(); + + assert_eq!(pool.try_get_results().iter().sum::(), 45); + } + + #[test] + fn test_limit() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(move || i); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); + } + + #[test] + fn test_multiple() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(move || i); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); + } +}