reformatted crate `multithreading` to pass tests

This commit is contained in:
Sven Vogel 2023-05-30 22:25:08 +02:00
parent 1208a04658
commit 990a54a032
1 changed files with 20 additions and 11 deletions

View File

@ -4,24 +4,24 @@
//! threads are available. If so the pool will directly launch a new thread to run the supplied function. //! threads are available. If so the pool will directly launch a new thread to run the supplied function.
//! In case no threads are available the job will be stalled for execution until a thread is free to run the first //! In case no threads are available the job will be stalled for execution until a thread is free to run the first
//! stalled job. //! stalled job.
//! //!
//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job //! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job
//! the pool still queries the result of the function which can be retrieved any time after the submission. //! the pool still queries the result of the function which can be retrieved any time after the submission.
//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. //! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool.
//! //!
//! # Threads //! # Threads
//! The maximum number of threads to be used can be specified when creating a new thread pool. //! The maximum number of threads to be used can be specified when creating a new thread pool.
//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. //! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use.
//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. //! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms.
//! For further information see: [`thread::available_parallelism`] //! For further information see: [`thread::available_parallelism`]
//! //!
//! # Memory consumption over time //! # Memory consumption over time
//! The pool will store the handle for every thread launched constantly increasing the memory consumption. //! The pool will store the handle for every thread launched constantly increasing the memory consumption.
//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to //! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. //! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread //! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. //! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
//! //!
//! # Portability //! # Portability
//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. //! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
@ -36,7 +36,7 @@ use std::{
collections::VecDeque, collections::VecDeque,
num::NonZeroUsize, num::NonZeroUsize,
sync::{ sync::{
atomic::{AtomicUsize, Ordering, AtomicBool}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
@ -90,13 +90,13 @@ fn get_default_thread_count() -> usize {
/// Additionally this implementation relies on using the `load` and `store` operations /// Additionally this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls
/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. /// to `unwrap` or `expected` from [`std::sync::MutexGuard`].
/// ///
/// # Memory consumption over time /// # Memory consumption over time
/// The pool will store the handle for every thread launched constantly increasing the memory consumption. /// The pool will store the handle for every thread launched constantly increasing the memory consumption.
/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to /// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to
/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. /// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption.
/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread /// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread
/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. /// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time.
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
pub struct ThreadPool<F, T> pub struct ThreadPool<F, T>
@ -132,7 +132,7 @@ where
// will be initialized to 0 // will be initialized to 0
threads: Arc::new(AtomicUsize::new(0)), threads: Arc::new(AtomicUsize::new(0)),
// do not drop handles by default // do not drop handles by default
drop_handles: Arc::new(AtomicBool::new(false)) drop_handles: Arc::new(AtomicBool::new(false)),
} }
} }
} }
@ -173,10 +173,13 @@ where
/// supplying a number of threads to great may negatively impact performance as the system may not /// supplying a number of threads to great may negatively impact performance as the system may not
/// be able to full fill the required needs /// be able to full fill the required needs
/// # Memory usage /// # Memory usage
/// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of
/// launched threads. This causes memory consumption to rise over time as more and more /// launched threads. This causes memory consumption to rise over time as more and more
/// threads are launched. /// threads are launched.
pub fn with_threads_and_drop_handles(max_thread_count: NonZeroUsize, drop_handles: bool) -> Self { pub fn with_threads_and_drop_handles(
max_thread_count: NonZeroUsize,
drop_handles: bool,
) -> Self {
Self { Self {
max_thread_count: max_thread_count.get(), max_thread_count: max_thread_count.get(),
drop_handles: Arc::new(AtomicBool::new(drop_handles)), drop_handles: Arc::new(AtomicBool::new(drop_handles)),
@ -318,7 +321,13 @@ fn execute<F, T>(
let next = queue.lock().unwrap().pop_front(); let next = queue.lock().unwrap().pop_front();
if let Some(next_closure) = next { if let Some(next_closure) = next {
// if we have sth. to execute, spawn a new thread // if we have sth. to execute, spawn a new thread
execute(queue, handles_copy.clone(), threads, drop.clone(), next_closure); execute(
queue,
handles_copy.clone(),
threads,
drop.clone(),
next_closure,
);
} else { } else {
// nothing to execute this thread will run out without any work to do // nothing to execute this thread will run out without any work to do
// decrement the amount of used threads // decrement the amount of used threads