added functionality to drop thread handles

automatically when threads have finished
This commit is contained in:
Sven Vogel 2023-05-30 15:12:02 +02:00
parent 898d878554
commit 1208a04658
1 changed files with 65 additions and 3 deletions

View File

@ -19,6 +19,8 @@
//! The pool will store the handle for every thread launched constantly increasing the memory consumption. //! The pool will store the handle for every thread launched constantly increasing the memory consumption.
//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to //! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. //! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
//! //!
//! # Portability //! # Portability
//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
@ -34,7 +36,7 @@ use std::{
collections::VecDeque, collections::VecDeque,
num::NonZeroUsize, num::NonZeroUsize,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering, AtomicBool},
Arc, Mutex, Arc, Mutex,
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
@ -88,6 +90,13 @@ fn get_default_thread_count() -> usize {
/// Additionally this implementation relies on using the `load` and `store` operations /// Additionally this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls
/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. /// to `unwrap` or `expected` from [`std::sync::MutexGuard`].
///
/// # Memory consumption over time
/// The pool will store the handle for every thread launched constantly increasing the memory consumption.
/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
pub struct ThreadPool<F, T> pub struct ThreadPool<F, T>
@ -107,6 +116,8 @@ where
/// Also this implementation relies on using the `load` and `store` operations /// Also this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` /// instead of using more comfortable one like `fetch_add`
threads: Arc<AtomicUsize>, threads: Arc<AtomicUsize>,
/// wether to keep the thread handles after the function returned
drop_handles: Arc<AtomicBool>,
} }
impl<F, T> Default for ThreadPool<F, T> impl<F, T> Default for ThreadPool<F, T>
@ -119,7 +130,9 @@ where
handles: Default::default(), handles: Default::default(),
queue: Default::default(), queue: Default::default(),
// will be initialized to 0 // will be initialized to 0
threads: Default::default(), threads: Arc::new(AtomicUsize::new(0)),
// do not drop handles by default
drop_handles: Arc::new(AtomicBool::new(false))
} }
} }
} }
@ -153,6 +166,24 @@ where
} }
} }
/// Create a new empty thread pool with the maximum number of threads set be the specified number
/// and also sets the flag to drop the handles of finished threads instead of storing them until
/// eihter `join_all` or `get_finished` is called.
/// # Overusage
/// supplying a number of threads to great may negatively impact performance as the system may not
/// be able to full fill the required needs
/// # Memory usage
/// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of
/// launched threads. This causes memory consumption to rise over time as more and more
/// threads are launched.
pub fn with_threads_and_drop_handles(max_thread_count: NonZeroUsize, drop_handles: bool) -> Self {
Self {
max_thread_count: max_thread_count.get(),
drop_handles: Arc::new(AtomicBool::new(drop_handles)),
..Default::default()
}
}
/// Pass a new closure to be executed as soon as a thread is available. /// Pass a new closure to be executed as soon as a thread is available.
/// This function will execute the supplied closure immediately when the number of running threads /// This function will execute the supplied closure immediately when the number of running threads
/// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time
@ -172,6 +203,7 @@ where
self.queue.clone(), self.queue.clone(),
self.handles.clone(), self.handles.clone(),
self.threads.clone(), self.threads.clone(),
self.drop_handles.clone(),
closure, closure,
); );
} else { } else {
@ -236,6 +268,29 @@ where
results results
} }
/// set the flag to indicate that thread handles will be dropped after the thread is finished
/// executing. All threads that have finished until now but haven't been removed will get dropped
/// after the next thread finishes.
pub fn drop_finished_handles(&self) {
self.drop_handles.store(false, Ordering::Release);
}
/// set the flag to indicate that thread handles will be kept after the thread is finished
/// executing until either `join_all` or `get_finished` is called.
/// Only new thread handles created after this call be kept.
pub fn keep_future_handles(&self) {
self.drop_handles.store(true, Ordering::Release);
}
}
/// Removes all thread handles which have finished only if the can be locked at
/// the current time. This function will not block execution when the lock cannot be acquired.
fn try_prune<T>(handles: Arc<Mutex<Vec<JoinHandle<T>>>>) {
if let Ok(mut handles) = handles.try_lock() {
// keep unfinished elements
handles.retain(|handle| !handle.is_finished());
}
} }
/// Execute the supplied closure on a new thread /// Execute the supplied closure on a new thread
@ -247,6 +302,7 @@ fn execute<F, T>(
queue: Arc<Mutex<VecDeque<F>>>, queue: Arc<Mutex<VecDeque<F>>>,
handles: Arc<Mutex<Vec<JoinHandle<T>>>>, handles: Arc<Mutex<Vec<JoinHandle<T>>>>,
threads: Arc<AtomicUsize>, threads: Arc<AtomicUsize>,
drop: Arc<AtomicBool>,
closure: F, closure: F,
) where ) where
T: Send + 'static, T: Send + 'static,
@ -262,7 +318,7 @@ fn execute<F, T>(
let next = queue.lock().unwrap().pop_front(); let next = queue.lock().unwrap().pop_front();
if let Some(next_closure) = next { if let Some(next_closure) = next {
// if we have sth. to execute, spawn a new thread // if we have sth. to execute, spawn a new thread
execute(queue, handles_copy, threads, next_closure); execute(queue, handles_copy.clone(), threads, drop.clone(), next_closure);
} else { } else {
// nothing to execute this thread will run out without any work to do // nothing to execute this thread will run out without any work to do
// decrement the amount of used threads // decrement the amount of used threads
@ -272,6 +328,12 @@ fn execute<F, T>(
) )
} }
// try to drop all fnished thread handles if necessary
// this is a non blocking operation
if drop.load(Ordering::Acquire) {
try_prune(handles_copy);
}
result result
})); }));
} }