Merge pull request #16 from programmieren-mit-rust/multithreading
Multithreading
This commit is contained in:
commit
086c0b9ada
|
@ -4,7 +4,7 @@
|
||||||
//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit
|
//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit
|
||||||
//! double precision floating point values.
|
//! double precision floating point values.
|
||||||
|
|
||||||
use std::{num::NonZeroUsize, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||||
use imsearch::multithreading::ThreadPool;
|
use imsearch::multithreading::ThreadPool;
|
||||||
|
@ -37,8 +37,8 @@ fn dot(a: &[f64], b: &[f64]) -> f64 {
|
||||||
/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
|
/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
|
||||||
/// finished the partial dot products will be summed to create the final result.
|
/// finished the partial dot products will be summed to create the final result.
|
||||||
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
let mut pool =
|
|
||||||
ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true);
|
let mut pool = ThreadPool::with_limit(threads);
|
||||||
|
|
||||||
// number of elements in each vector for each thread
|
// number of elements in each vector for each thread
|
||||||
let steps = a.len() / threads;
|
let steps = a.len() / threads;
|
||||||
|
@ -57,14 +57,9 @@ fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
black_box(
|
pool.join_all();
|
||||||
// wait for the threads to finish
|
|
||||||
pool.join_all()
|
black_box(pool.get_results().iter().sum::<f64>());
|
||||||
// iterate over the results and sum the parital dot products together
|
|
||||||
.into_iter()
|
|
||||||
.map(|r| r.unwrap())
|
|
||||||
.reduce(|a, b| a + b),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute a simple hash value for the given index value.
|
/// Compute a simple hash value for the given index value.
|
||||||
|
@ -114,8 +109,6 @@ pub fn bench_threadpool(c: &mut Criterion) {
|
||||||
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
// automatically choose the number of threads
|
// automatically choose the number of threads
|
||||||
let mut pool = ThreadPool::new();
|
let mut pool = ThreadPool::new();
|
||||||
// drop the handles used by each thread after its done
|
|
||||||
pool.drop_finished_handles();
|
|
||||||
|
|
||||||
// number of elements in each vector for each thread
|
// number of elements in each vector for each thread
|
||||||
let steps = a.len() / threads;
|
let steps = a.len() / threads;
|
||||||
|
@ -134,14 +127,9 @@ fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
black_box(
|
pool.join_all();
|
||||||
// wait for the threads to finish
|
|
||||||
pool.join_all()
|
black_box(pool.get_results().iter().sum::<f64>());
|
||||||
// iterate over the results and sum the parital dot products together
|
|
||||||
.into_iter()
|
|
||||||
.map(|r| r.unwrap())
|
|
||||||
.reduce(|a, b| a + b),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
||||||
|
|
|
@ -1,399 +1,265 @@
|
||||||
//! This module provides the functionality to create a thread pool of fixed capacity.
|
//! This module provides the functionality to create thread pool to execute tasks in parallel.
|
||||||
//! This means that the pool can be used to dispatch functions or closures that will be executed
|
//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
||||||
//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether
|
//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
||||||
//! threads are available. If so the pool will directly launch a new thread to run the supplied function.
|
//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
||||||
//! In case no threads are available the job will be stalled for execution until a thread is free to run the first
|
//! a new thread will be launched until the maximum number is reached. By then every launched thread will
|
||||||
//! stalled job.
|
//! be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
||||||
|
//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
||||||
|
//! # Example
|
||||||
|
//! ```rust
|
||||||
|
//! # use imsearch::multithreading::ThreadPool;
|
||||||
|
//! let mut pool = ThreadPool::with_limit(2);
|
||||||
//!
|
//!
|
||||||
//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job
|
//! for i in 0..10 {
|
||||||
//! the pool still queries the result of the function which can be retrieved any time after the submission.
|
//! pool.enqueue(move || i);
|
||||||
//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool.
|
//! }
|
||||||
//!
|
//!
|
||||||
//! # Threads
|
//! pool.join_all();
|
||||||
//! The maximum number of threads to be used can be specified when creating a new thread pool.
|
//! assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||||
//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use.
|
//! ```
|
||||||
//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms.
|
|
||||||
//! For further information see: [`thread::available_parallelism`]
|
|
||||||
//!
|
|
||||||
//! # Memory consumption over time
|
|
||||||
//! The pool will store the handle for every thread launched constantly increasing the memory consumption.
|
|
||||||
//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
|
|
||||||
//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
|
|
||||||
//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
|
|
||||||
//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
|
|
||||||
//!
|
|
||||||
//! # Portability
|
|
||||||
//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
|
|
||||||
//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
|
|
||||||
//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
|
|
||||||
//! (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
|
|
||||||
//! Additionally this implementation relies on using the `load` and `store` operations
|
|
||||||
//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls
|
|
||||||
//! to `unwrap` or `expected` from [`std::sync::MutexGuard`].
|
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
any::Any,
|
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
num::NonZeroUsize,
|
num::NonZeroUsize,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
mpsc::{channel, Receiver, Sender},
|
||||||
Arc, Mutex,
|
Arc, Mutex,
|
||||||
},
|
},
|
||||||
thread::{self, JoinHandle},
|
thread::{self, JoinHandle},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Maximum number of thread to be used by the thread pool in case all methods
|
/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails.
|
||||||
/// of determining a recommend number failed
|
pub const DEFAULT_THREAD_POOL_SIZE: usize = 1;
|
||||||
#[allow(unused)]
|
|
||||||
pub const FALLBACK_THREADS: usize = 1;
|
|
||||||
|
|
||||||
/// Returns the number of threads to be used by the thread pool by default.
|
/// Indicates the priority level of functions or closures which get supplied to the pool.
|
||||||
/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`].
|
/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied
|
||||||
/// In case this fails [`FALLBACK_THREADS`] will be returned
|
/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied
|
||||||
fn get_default_thread_count() -> usize {
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
// number of threads to fallback to
|
pub enum Priority {
|
||||||
let fallback_threads =
|
/// Indicate that the closure or function supplied to the thread
|
||||||
NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero");
|
/// has higher priority than any other given to the pool until now.
|
||||||
// determine the maximum recommend number of threads to use
|
/// The item will get enqueued at the start of the waiting-queue.
|
||||||
// most of the time this is gonna be the number of cpus
|
High,
|
||||||
thread::available_parallelism()
|
/// Indicate that the closure or function supplied to the thread pool
|
||||||
.unwrap_or(fallback_threads)
|
/// has lower priority than the already supplied ones in this pool.
|
||||||
.get()
|
/// The item will get enqueued at the end of the waiting-queue.
|
||||||
|
Low,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This struct manages a pool of threads with a fixed maximum number.
|
/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are
|
||||||
/// Any time a closure is passed to `enqueue` the pool checks whether it can
|
/// free to execute them directly. They are meant to be executed only once and be done.
|
||||||
/// directly launch a new thread to execute the closure. If the maximum number
|
pub trait Job<T>: Send + 'static + FnOnce() -> T
|
||||||
/// of threads is reached the closure is staged and will get executed by next
|
where
|
||||||
/// thread to be available.
|
T: Send,
|
||||||
/// The pool will also keep track of every `JoinHandle` created by running every closure on
|
{
|
||||||
/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`.
|
}
|
||||||
|
|
||||||
|
impl<U, T> Job<T> for U
|
||||||
|
where
|
||||||
|
U: Send + 'static + FnOnce() -> T,
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Thread pool which can be used to execute functions or closures in parallel.
|
||||||
|
/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
||||||
|
/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
||||||
|
/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
||||||
|
/// a new thread will be launched until the maximum number is reached. By then every launched thread will
|
||||||
|
/// be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
||||||
|
/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
||||||
/// # Example
|
/// # Example
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use imsearch::multithreading::ThreadPool;
|
/// # use imsearch::multithreading::ThreadPool;
|
||||||
/// let mut pool = ThreadPool::new();
|
/// let mut pool = ThreadPool::with_limit(2);
|
||||||
///
|
///
|
||||||
/// // launch some work in parallel
|
|
||||||
/// for i in 0..10 {
|
/// for i in 0..10 {
|
||||||
/// pool.enqueue(move || {
|
/// pool.enqueue(move || i);
|
||||||
/// println!("I am multithreaded and have id: {i}");
|
|
||||||
/// });
|
|
||||||
/// }
|
/// }
|
||||||
/// // wait for threads to finish
|
|
||||||
/// pool.join_all();
|
|
||||||
/// ```
|
|
||||||
/// # Portability
|
|
||||||
/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
|
|
||||||
/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
|
|
||||||
/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
|
|
||||||
/// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
|
|
||||||
/// Additionally this implementation relies on using the `load` and `store` operations
|
|
||||||
/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls
|
|
||||||
/// to `unwrap` or `expected` from [`std::sync::MutexGuard`].
|
|
||||||
///
|
///
|
||||||
/// # Memory consumption over time
|
/// pool.join_all();
|
||||||
/// The pool will store the handle for every thread launched constantly increasing the memory consumption.
|
/// assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||||
/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
|
/// ```
|
||||||
/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
|
|
||||||
/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
|
|
||||||
/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ThreadPool<F, T>
|
pub struct ThreadPool<T, F>
|
||||||
where
|
where
|
||||||
F: Send + FnOnce() -> T,
|
T: Send,
|
||||||
|
F: Job<T>,
|
||||||
{
|
{
|
||||||
/// maximum number of threads to launch at once
|
/// queue for storing the jobs to be executed
|
||||||
max_thread_count: usize,
|
|
||||||
/// handles for launched threads
|
|
||||||
handles: Arc<Mutex<Vec<JoinHandle<T>>>>,
|
|
||||||
/// function to be executed when threads are ready
|
|
||||||
queue: Arc<Mutex<VecDeque<F>>>,
|
queue: Arc<Mutex<VecDeque<F>>>,
|
||||||
/// number of currently running threads
|
/// handles for all threads currently running and processing jobs
|
||||||
/// new implementation relies on atomic primitives to avoid locking and possible
|
handles: Vec<JoinHandle<()>>,
|
||||||
/// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing"
|
/// reciver end for channel based communication between threads
|
||||||
/// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
|
receiver: Receiver<T>,
|
||||||
/// Also this implementation relies on using the `load` and `store` operations
|
/// sender end for channel based communication between threads
|
||||||
/// instead of using more comfortable one like `fetch_add`
|
sender: Sender<T>,
|
||||||
threads: Arc<AtomicUsize>,
|
/// maximum amount of threads to be used in parallel
|
||||||
/// wether to keep the thread handles after the function returned
|
limit: NonZeroUsize,
|
||||||
drop_handles: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, T> Default for ThreadPool<F, T>
|
impl<T, F> Default for ThreadPool<T, F>
|
||||||
where
|
where
|
||||||
F: Send + FnOnce() -> T,
|
T: Send + 'static,
|
||||||
|
F: Job<T>,
|
||||||
{
|
{
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
|
let (sender, receiver) = channel::<T>();
|
||||||
|
|
||||||
|
// determine default thread count to use based on the system
|
||||||
|
let default =
|
||||||
|
NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero");
|
||||||
|
let limit = thread::available_parallelism().unwrap_or(default);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
max_thread_count: get_default_thread_count(),
|
queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||||
handles: Default::default(),
|
handles: Vec::new(),
|
||||||
queue: Default::default(),
|
receiver,
|
||||||
// will be initialized to 0
|
sender,
|
||||||
threads: Arc::new(AtomicUsize::new(0)),
|
limit,
|
||||||
// do not drop handles by default
|
|
||||||
drop_handles: Arc::new(AtomicBool::new(false)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
impl<T, F> ThreadPool<T, F>
|
||||||
impl<F, T> ThreadPool<F, T>
|
|
||||||
where
|
where
|
||||||
F: Send + FnOnce() -> T + 'static,
|
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
|
F: Job<T>,
|
||||||
{
|
{
|
||||||
/// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads
|
/// Creates a new thread pool with default thread count determined by either
|
||||||
/// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`].
|
/// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails.
|
||||||
/// # Limitations
|
/// No threads will be lauched until jobs are enqueued.
|
||||||
/// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`].
|
|
||||||
/// That can happen if the program runs inside of a container or vm with poorly configured parallelism.
|
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
Default::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if
|
||||||
|
/// the system does not allow for that count of parallelism.
|
||||||
|
/// No threads will be lauched until jobs are enqueued.
|
||||||
|
/// # Panic
|
||||||
|
/// This function will fails if `max_threads` is zero.
|
||||||
|
pub fn with_limit(max_threads: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
max_thread_count: get_default_thread_count(),
|
limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new empty thread pool with the maximum number of threads set be the specified number
|
/// Put a new job into the queue to be executed by a thread in the future.
|
||||||
/// # Overusage
|
/// The priority of the job will determine if the job will be put at the start or end of the queue.
|
||||||
/// supplying a number of threads to great may negatively impact performance as the system may not
|
/// See [`crate::multithreading::Priority`].
|
||||||
/// be able to full fill the required needs
|
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||||
pub fn with_threads(max_thread_count: NonZeroUsize) -> Self {
|
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||||
Self {
|
/// when a thread is ready and its at the start of the queue.
|
||||||
max_thread_count: max_thread_count.get(),
|
pub fn enqueue_priorize(&mut self, func: F, priority: Priority) {
|
||||||
..Default::default()
|
// put job into queue
|
||||||
|
let mut queue = self.queue.lock().unwrap();
|
||||||
|
|
||||||
|
// insert new job into queue depending on its priority
|
||||||
|
match priority {
|
||||||
|
Priority::High => queue.push_front(func),
|
||||||
|
Priority::Low => queue.push_back(func),
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.handles.len() < self.limit.get() {
|
||||||
|
// we can still launch threads to run in parallel
|
||||||
|
|
||||||
|
// clone the sender
|
||||||
|
let tx = self.sender.clone();
|
||||||
|
let queue = self.queue.clone();
|
||||||
|
|
||||||
|
self.handles.push(thread::spawn(move || {
|
||||||
|
while let Some(job) = queue.lock().unwrap().pop_front() {
|
||||||
|
tx.send(job()).expect("cannot send result");
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.handles.retain(|h| !h.is_finished());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Put a new job into the queue to be executed by a thread in the future.
|
||||||
|
/// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`].
|
||||||
|
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||||
|
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||||
|
/// when a thread is ready and its at the start of the queue.
|
||||||
|
pub fn enqueue(&mut self, func: F) {
|
||||||
|
self.enqueue_priorize(func, Priority::Low);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for all threads to finish executing. This means that by the time all threads have finished
|
||||||
|
/// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty.
|
||||||
|
/// This function will block the caller thread.
|
||||||
|
pub fn join_all(&mut self) {
|
||||||
|
while let Some(handle) = self.handles.pop() {
|
||||||
|
handle.join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new empty thread pool with the maximum number of threads set be the specified number
|
/// Returns all results that have been returned by the threads until now
|
||||||
/// and also sets the flag to drop the handles of finished threads instead of storing them until
|
/// and haven't been consumed yet.
|
||||||
/// eihter `join_all` or `get_finished` is called.
|
/// All results retrieved from this call won't be returned on a second call.
|
||||||
/// # Overusage
|
/// This function is non blocking.
|
||||||
/// supplying a number of threads to great may negatively impact performance as the system may not
|
pub fn try_get_results(&mut self) -> Vec<T> {
|
||||||
/// be able to full fill the required needs
|
self.receiver.try_iter().collect()
|
||||||
/// # Memory usage
|
|
||||||
/// if `drop_handles` is set to `false` the pool will continue to store the handles of
|
|
||||||
/// launched threads. This causes memory consumption to rise over time as more and more
|
|
||||||
/// threads are launched.
|
|
||||||
pub fn with_threads_and_drop_handles(
|
|
||||||
max_thread_count: NonZeroUsize,
|
|
||||||
drop_handles: bool,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
max_thread_count: max_thread_count.get(),
|
|
||||||
drop_handles: Arc::new(AtomicBool::new(drop_handles)),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pass a new closure to be executed as soon as a thread is available.
|
/// Returns all results that have been returned by the threads until now
|
||||||
/// This function will execute the supplied closure immediately when the number of running threads
|
/// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue).
|
||||||
/// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time
|
/// All results retrieved from this call won't be returned on a second call.
|
||||||
/// in the future unless program doesn't die before.
|
/// This function will block the caller thread.
|
||||||
/// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled
|
pub fn get_results(&mut self) -> Vec<T> {
|
||||||
/// closures be executed.
|
self.join_all();
|
||||||
pub fn enqueue(&mut self, closure: F) {
|
self.try_get_results()
|
||||||
// read used thread counter and apply all store operations with Ordering::Release
|
|
||||||
let used_threads = self.threads.load(Ordering::Acquire);
|
|
||||||
// test if we can launch a new thread
|
|
||||||
if used_threads < self.max_thread_count {
|
|
||||||
// we can create a new thread, increment the thread count
|
|
||||||
self.threads
|
|
||||||
.store(used_threads.saturating_add(1), Ordering::Release);
|
|
||||||
// run new thread
|
|
||||||
execute(
|
|
||||||
self.queue.clone(),
|
|
||||||
self.handles.clone(),
|
|
||||||
self.threads.clone(),
|
|
||||||
self.drop_handles.clone(),
|
|
||||||
closure,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// all threads being used
|
|
||||||
// enqueue closure to be launched when a thread is ready
|
|
||||||
self.queue.lock().unwrap().push_back(closure);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes all closures stalled for execution.
|
|
||||||
/// All closures still waiting to be executed will be dropped by the pool and
|
|
||||||
/// won't get executed. Useful if an old set of closures hasn't run yet but are outdated
|
|
||||||
/// and resources are required immediately for updated closures.
|
|
||||||
pub fn discard_stalled(&mut self) {
|
|
||||||
self.queue.lock().unwrap().clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Waits for all currently running threads and all stalled closures to be executed.
|
|
||||||
/// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all
|
|
||||||
/// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector.
|
|
||||||
/// If the vector is of length zero, no threads were joined and the thread pool didn't do anything.
|
|
||||||
/// All handles of threads will be removed after this call.
|
|
||||||
pub fn join_all(&mut self) -> Vec<Result<T, Box<dyn Any + Send>>> {
|
|
||||||
let mut results = Vec::new();
|
|
||||||
loop {
|
|
||||||
// lock the handles, pop the last one off and unlock handles again
|
|
||||||
// to allow running threads to process
|
|
||||||
let handle = self.handles.lock().unwrap().pop();
|
|
||||||
|
|
||||||
// if we still have a handle join it else no handles are left we abort the loop
|
|
||||||
if let Some(handle) = handle {
|
|
||||||
results.push(handle.join());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
results
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the results of every thread that has already finished until now.
|
|
||||||
/// All other threads currently running won't be waited for nor for any closure stalled for execution in the future.
|
|
||||||
/// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy.
|
|
||||||
/// All handles of finished threads will be removed after this call.
|
|
||||||
pub fn get_finished(&mut self) -> Vec<Result<T, Box<dyn Any + Send>>> {
|
|
||||||
let mut results = Vec::new();
|
|
||||||
|
|
||||||
let mut handles = self.handles.lock().unwrap();
|
|
||||||
|
|
||||||
// loop through the handles and remove all finished handles
|
|
||||||
// join on the finished handles which will be quick as they are finished!
|
|
||||||
let mut idx = 0;
|
|
||||||
while idx < handles.len() {
|
|
||||||
if handles[idx].is_finished() {
|
|
||||||
// thread is finished, yield result
|
|
||||||
results.push(handles.remove(idx).join());
|
|
||||||
} else {
|
|
||||||
// thread isn't done, continue to the next one
|
|
||||||
idx += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
results
|
|
||||||
}
|
|
||||||
|
|
||||||
/// set the flag to indicate that thread handles will be dropped after the thread is finished
|
|
||||||
/// executing. All threads that have finished until now but haven't been removed will get dropped
|
|
||||||
/// after the next thread finishes.
|
|
||||||
pub fn drop_finished_handles(&self) {
|
|
||||||
self.drop_handles.store(false, Ordering::Release);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// set the flag to indicate that thread handles will be kept after the thread is finished
|
|
||||||
/// executing until either `join_all` or `get_finished` is called.
|
|
||||||
/// Only new thread handles created after this call be kept.
|
|
||||||
pub fn keep_future_handles(&self) {
|
|
||||||
self.drop_handles.store(true, Ordering::Release);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes all thread handles which have finished only if the can be locked at
|
|
||||||
/// the current time. This function will not block execution when the lock cannot be acquired.
|
|
||||||
fn try_prune<T>(handles: Arc<Mutex<Vec<JoinHandle<T>>>>) {
|
|
||||||
if let Ok(mut handles) = handles.try_lock() {
|
|
||||||
// keep unfinished elements
|
|
||||||
handles.retain(|handle| !handle.is_finished());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute the supplied closure on a new thread
|
|
||||||
/// and store the threads handle into `handles`. When the thread
|
|
||||||
/// finished executing the closure it will look for any closures left in `queue` and
|
|
||||||
/// recursively execute it on a new thread. This method updates threads` in order to
|
|
||||||
/// keep track of the number of active threads.
|
|
||||||
fn execute<F, T>(
|
|
||||||
queue: Arc<Mutex<VecDeque<F>>>,
|
|
||||||
handles: Arc<Mutex<Vec<JoinHandle<T>>>>,
|
|
||||||
threads: Arc<AtomicUsize>,
|
|
||||||
drop: Arc<AtomicBool>,
|
|
||||||
closure: F,
|
|
||||||
) where
|
|
||||||
T: Send + 'static,
|
|
||||||
F: Send + FnOnce() -> T + 'static,
|
|
||||||
{
|
|
||||||
let handles_copy = handles.clone();
|
|
||||||
|
|
||||||
handles.lock().unwrap().push(thread::spawn(move || {
|
|
||||||
// run closure (actual work)
|
|
||||||
let result = closure();
|
|
||||||
|
|
||||||
// take the next closure stalled for execution
|
|
||||||
let next = queue.lock().unwrap().pop_front();
|
|
||||||
if let Some(next_closure) = next {
|
|
||||||
// if we have sth. to execute, spawn a new thread
|
|
||||||
execute(
|
|
||||||
queue,
|
|
||||||
handles_copy.clone(),
|
|
||||||
threads,
|
|
||||||
drop.clone(),
|
|
||||||
next_closure,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// nothing to execute this thread will run out without any work to do
|
|
||||||
// decrement the amount of used threads
|
|
||||||
threads.store(
|
|
||||||
threads.load(Ordering::Acquire).saturating_sub(1),
|
|
||||||
Ordering::Release,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// try to drop all fnished thread handles if necessary
|
|
||||||
// this is a non blocking operation
|
|
||||||
if drop.load(Ordering::Acquire) {
|
|
||||||
try_prune(handles_copy);
|
|
||||||
}
|
|
||||||
|
|
||||||
result
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod test {
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_thread_pool() {
|
fn test_default() {
|
||||||
// auto determine the amount of threads to use
|
let mut pool = ThreadPool::default();
|
||||||
let mut pool = ThreadPool::new();
|
|
||||||
|
|
||||||
// launch 4 jobs to run on our pool
|
for i in 0..10 {
|
||||||
for i in 0..4 {
|
pool.enqueue_priorize(move || i, Priority::High);
|
||||||
pool.enqueue(move || (0..=i).sum::<usize>());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the threads to finish and sum their results
|
pool.join_all();
|
||||||
let sum = pool
|
|
||||||
.join_all()
|
|
||||||
.into_iter()
|
|
||||||
.map(|r| r.unwrap())
|
|
||||||
.sum::<usize>();
|
|
||||||
|
|
||||||
assert_eq!(sum, 10);
|
assert_eq!(pool.try_get_results().iter().sum::<i32>(), 45);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_drop_stalled() {
|
fn test_limit() {
|
||||||
// auto determine the amount of threads to use
|
let mut pool = ThreadPool::with_limit(2);
|
||||||
let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap());
|
|
||||||
|
|
||||||
// launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds
|
for i in 0..10 {
|
||||||
for i in 0..1 {
|
pool.enqueue(move || i);
|
||||||
pool.enqueue(move || {
|
|
||||||
thread::sleep(Duration::from_secs(i * 20));
|
|
||||||
i
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait 10 secs
|
assert_eq!(pool.handles.len(), 2);
|
||||||
thread::sleep(Duration::from_secs(2));
|
assert_eq!(pool.limit.get(), 2);
|
||||||
// discard job that should still run
|
|
||||||
pool.discard_stalled();
|
|
||||||
|
|
||||||
// wait for the threads to finish and sum their results
|
pool.join_all();
|
||||||
let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::<u64>();
|
|
||||||
|
|
||||||
assert_eq!(sum, 0);
|
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multiple() {
|
||||||
|
let mut pool = ThreadPool::with_limit(2);
|
||||||
|
|
||||||
|
for i in 0..10 {
|
||||||
|
pool.enqueue(move || i);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(pool.handles.len(), 2);
|
||||||
|
assert_eq!(pool.limit.get(), 2);
|
||||||
|
|
||||||
|
pool.join_all();
|
||||||
|
|
||||||
|
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue