diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index b832df0..5c3ed53 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -19,6 +19,8 @@ //! The pool will store the handle for every thread launched constantly increasing the memory consumption. //! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to //! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. //! //! # Portability //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. @@ -34,7 +36,7 @@ use std::{ collections::VecDeque, num::NonZeroUsize, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering, AtomicBool}, Arc, Mutex, }, thread::{self, JoinHandle}, @@ -88,6 +90,13 @@ fn get_default_thread_count() -> usize { /// Additionally this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +/// +/// # Memory consumption over time +/// The pool will store the handle for every thread launched constantly increasing the memory consumption. +/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. #[allow(dead_code)] #[derive(Debug)] pub struct ThreadPool @@ -107,6 +116,8 @@ where /// Also this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` threads: Arc, + /// wether to keep the thread handles after the function returned + drop_handles: Arc, } impl Default for ThreadPool @@ -119,7 +130,9 @@ where handles: Default::default(), queue: Default::default(), // will be initialized to 0 - threads: Default::default(), + threads: Arc::new(AtomicUsize::new(0)), + // do not drop handles by default + drop_handles: Arc::new(AtomicBool::new(false)) } } } @@ -153,6 +166,24 @@ where } } + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// and also sets the flag to drop the handles of finished threads instead of storing them until + /// eihter `join_all` or `get_finished` is called. + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + /// # Memory usage + /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of + /// launched threads. This causes memory consumption to rise over time as more and more + /// threads are launched. + pub fn with_threads_and_drop_handles(max_thread_count: NonZeroUsize, drop_handles: bool) -> Self { + Self { + max_thread_count: max_thread_count.get(), + drop_handles: Arc::new(AtomicBool::new(drop_handles)), + ..Default::default() + } + } + /// Pass a new closure to be executed as soon as a thread is available. /// This function will execute the supplied closure immediately when the number of running threads /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time @@ -172,6 +203,7 @@ where self.queue.clone(), self.handles.clone(), self.threads.clone(), + self.drop_handles.clone(), closure, ); } else { @@ -236,6 +268,29 @@ where results } + + /// set the flag to indicate that thread handles will be dropped after the thread is finished + /// executing. All threads that have finished until now but haven't been removed will get dropped + /// after the next thread finishes. + pub fn drop_finished_handles(&self) { + self.drop_handles.store(false, Ordering::Release); + } + + /// set the flag to indicate that thread handles will be kept after the thread is finished + /// executing until either `join_all` or `get_finished` is called. + /// Only new thread handles created after this call be kept. + pub fn keep_future_handles(&self) { + self.drop_handles.store(true, Ordering::Release); + } +} + +/// Removes all thread handles which have finished only if the can be locked at +/// the current time. This function will not block execution when the lock cannot be acquired. +fn try_prune(handles: Arc>>>) { + if let Ok(mut handles) = handles.try_lock() { + // keep unfinished elements + handles.retain(|handle| !handle.is_finished()); + } } /// Execute the supplied closure on a new thread @@ -247,6 +302,7 @@ fn execute( queue: Arc>>, handles: Arc>>>, threads: Arc, + drop: Arc, closure: F, ) where T: Send + 'static, @@ -262,7 +318,7 @@ fn execute( let next = queue.lock().unwrap().pop_front(); if let Some(next_closure) = next { // if we have sth. to execute, spawn a new thread - execute(queue, handles_copy, threads, next_closure); + execute(queue, handles_copy.clone(), threads, drop.clone(), next_closure); } else { // nothing to execute this thread will run out without any work to do // decrement the amount of used threads @@ -272,6 +328,12 @@ fn execute( ) } + // try to drop all fnished thread handles if necessary + // this is a non blocking operation + if drop.load(Ordering::Acquire) { + try_prune(handles_copy); + } + result })); }