From 10266dd1bc3f76f0e29678e931b320b04b30a85d Mon Sep 17 00:00:00 2001 From: teridax <72654954+Servostar@users.noreply.github.com> Date: Thu, 1 Jun 2023 20:45:07 +0200 Subject: [PATCH 1/2] Multithreading (#12) * added multithreading crate with thread pool * added crate mutlithreading * replaced `threads` memeber from `Threadpool` mutex with atomic primitive * fixed doctest for threadpool * added module documentation to multithreading * added functionality to drop thread handles automatically when threads have finished * reformatted crate `multithreading` to pass tests * added benchmark for `threadpool` using `criterion`. * finished benchmark for threadpool and fixed documentation for threadpool * added unit test to `multithreading` --- .gitignore | 1 + Cargo.toml | 7 + benches/multithreading.rs | 186 ++++++++++++++++++ src/lib.rs | 2 + src/multithreading/mod.rs | 399 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 595 insertions(+) create mode 100644 benches/multithreading.rs create mode 100644 src/multithreading/mod.rs diff --git a/.gitignore b/.gitignore index 4c790d0..98f2979 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock .DS_Store +/.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a6d8ad6..69cd0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"] png = "0.17.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "multithreading" +harness = false \ No newline at end of file diff --git a/benches/multithreading.rs b/benches/multithreading.rs new file mode 100644 index 0000000..84a6014 --- /dev/null +++ b/benches/multithreading.rs @@ -0,0 +1,186 @@ +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. + +use std::{num::NonZeroUsize, sync::Arc}; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use imsearch::multithreading::ThreadPool; + +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum +} + +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + let mut pool = + ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() +} + +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64 + seed as f64)); + } + + Arc::new(vec) +} + +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool with various number of threads"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + // drop the handles used by each thread after its done + pool.drop_finished_handles(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); + }); +} + +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 7d12d9a..e0f9a41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +pub mod multithreading; + pub fn add(left: usize, right: usize) -> usize { left + right } diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs new file mode 100644 index 0000000..00ddc74 --- /dev/null +++ b/src/multithreading/mod.rs @@ -0,0 +1,399 @@ +//! This module provides the functionality to create a thread pool of fixed capacity. +//! This means that the pool can be used to dispatch functions or closures that will be executed +//! some time in the future each on its own thread. When dispatching jobs, the pool will test whether +//! threads are available. If so the pool will directly launch a new thread to run the supplied function. +//! In case no threads are available the job will be stalled for execution until a thread is free to run the first +//! stalled job. +//! +//! The pool will also keep track of all the handles that [`std::thread::spawn`] returns. Hence after executing a job +//! the pool still queries the result of the function which can be retrieved any time after the submission. +//! After retrieving the result of the function the handle is discarded and cannot be accessed again through the thread pool. +//! +//! # Threads +//! The maximum number of threads to be used can be specified when creating a new thread pool. +//! Alternatively the thread pool can be advised to automatically determine the recommend amount of threads to use. +//! Note that this has its limitations due to possible side effects of sandboxing, containerization or vms. +//! For further information see: [`thread::available_parallelism`] +//! +//! # Memory consumption over time +//! The pool will store the handle for every thread launched constantly increasing the memory consumption. +//! It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +//! `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +//! Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +//! handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. +//! +//! # Portability +//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +//! (see: ). +//! Additionally this implementation relies on using the `load` and `store` operations +//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls +//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. + +use std::{ + any::Any, + collections::VecDeque, + num::NonZeroUsize, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, +}; + +/// Maximum number of thread to be used by the thread pool in case all methods +/// of determining a recommend number failed +#[allow(unused)] +pub const FALLBACK_THREADS: usize = 1; + +/// Returns the number of threads to be used by the thread pool by default. +/// This function tries to fetch a recommended number by calling [`thread::available_parallelism`]. +/// In case this fails [`FALLBACK_THREADS`] will be returned +fn get_default_thread_count() -> usize { + // number of threads to fallback to + let fallback_threads = + NonZeroUsize::new(FALLBACK_THREADS).expect("fallback_threads must be nonzero"); + // determine the maximum recommend number of threads to use + // most of the time this is gonna be the number of cpus + thread::available_parallelism() + .unwrap_or(fallback_threads) + .get() +} + +/// This struct manages a pool of threads with a fixed maximum number. +/// Any time a closure is passed to `enqueue` the pool checks whether it can +/// directly launch a new thread to execute the closure. If the maximum number +/// of threads is reached the closure is staged and will get executed by next +/// thread to be available. +/// The pool will also keep track of every `JoinHandle` created by running every closure on +/// its on thread. The closures can be obtained by either calling `join_all` or `get_finished`. +/// # Example +/// ```rust +/// use imsearch::multithreading::ThreadPool; +/// let mut pool = ThreadPool::new(); +/// +/// // launch some work in parallel +/// for i in 0..10 { +/// pool.enqueue(move || { +/// println!("I am multithreaded and have id: {i}"); +/// }); +/// } +/// // wait for threads to finish +/// pool.join_all(); +/// ``` +/// # Portability +/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. +/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. +/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" +/// (see: ). +/// Additionally this implementation relies on using the `load` and `store` operations +/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls +/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. +/// +/// # Memory consumption over time +/// The pool will store the handle for every thread launched constantly increasing the memory consumption. +/// It should be noted that the pool won't perform any kind of cleanup of the stored handles, meaning it is recommended to either make regular calls to +/// `join_all` or `get_finished` in order to clear the vector of handles to avoid endless memory consumption. +/// Alternatively, you can use the function `with_threads_and_drop_handles` to create a new pool that discard all thread +/// handles after the threads are finished. This will automatically reduce the memory consumption of the pool over time. +#[allow(dead_code)] +#[derive(Debug)] +pub struct ThreadPool +where + F: Send + FnOnce() -> T, +{ + /// maximum number of threads to launch at once + max_thread_count: usize, + /// handles for launched threads + handles: Arc>>>, + /// function to be executed when threads are ready + queue: Arc>>, + /// number of currently running threads + /// new implementation relies on atomic primitives to avoid locking and possible + /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" + /// (see: ). + /// Also this implementation relies on using the `load` and `store` operations + /// instead of using more comfortable one like `fetch_add` + threads: Arc, + /// wether to keep the thread handles after the function returned + drop_handles: Arc, +} + +impl Default for ThreadPool +where + F: Send + FnOnce() -> T, +{ + fn default() -> Self { + Self { + max_thread_count: get_default_thread_count(), + handles: Default::default(), + queue: Default::default(), + // will be initialized to 0 + threads: Arc::new(AtomicUsize::new(0)), + // do not drop handles by default + drop_handles: Arc::new(AtomicBool::new(false)), + } + } +} + +#[allow(dead_code)] +impl ThreadPool +where + F: Send + FnOnce() -> T + 'static, + T: Send + 'static, +{ + /// Create a new empty thread pool with the maximum number of threads set be the recommended amount of threads + /// supplied by [`std::thread::available_parallelism`] or in case the function fails [`FALLBACK_THREADS`]. + /// # Limitations + /// This function may assume the wrong number of threads due to the nature of [`std::thread::available_parallelism`]. + /// That can happen if the program runs inside of a container or vm with poorly configured parallelism. + pub fn new() -> Self { + Self { + max_thread_count: get_default_thread_count(), + ..Default::default() + } + } + + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + pub fn with_threads(max_thread_count: NonZeroUsize) -> Self { + Self { + max_thread_count: max_thread_count.get(), + ..Default::default() + } + } + + /// Create a new empty thread pool with the maximum number of threads set be the specified number + /// and also sets the flag to drop the handles of finished threads instead of storing them until + /// eihter `join_all` or `get_finished` is called. + /// # Overusage + /// supplying a number of threads to great may negatively impact performance as the system may not + /// be able to full fill the required needs + /// # Memory usage + /// if `drop_handles` is set to `false` the pool will continue to store the handles of + /// launched threads. This causes memory consumption to rise over time as more and more + /// threads are launched. + pub fn with_threads_and_drop_handles( + max_thread_count: NonZeroUsize, + drop_handles: bool, + ) -> Self { + Self { + max_thread_count: max_thread_count.get(), + drop_handles: Arc::new(AtomicBool::new(drop_handles)), + ..Default::default() + } + } + + /// Pass a new closure to be executed as soon as a thread is available. + /// This function will execute the supplied closure immediately when the number of running threads + /// is lower than the maximum number of threads. Otherwise the closure will be executed at some undetermined time + /// in the future unless program doesn't die before. + /// If `join_all` is called and the closure hasn't been executed yet, `join_all` will wait for all stalled + /// closures be executed. + pub fn enqueue(&mut self, closure: F) { + // read used thread counter and apply all store operations with Ordering::Release + let used_threads = self.threads.load(Ordering::Acquire); + // test if we can launch a new thread + if used_threads < self.max_thread_count { + // we can create a new thread, increment the thread count + self.threads + .store(used_threads.saturating_add(1), Ordering::Release); + // run new thread + execute( + self.queue.clone(), + self.handles.clone(), + self.threads.clone(), + self.drop_handles.clone(), + closure, + ); + } else { + // all threads being used + // enqueue closure to be launched when a thread is ready + self.queue.lock().unwrap().push_back(closure); + } + } + + /// Removes all closures stalled for execution. + /// All closures still waiting to be executed will be dropped by the pool and + /// won't get executed. Useful if an old set of closures hasn't run yet but are outdated + /// and resources are required immediately for updated closures. + pub fn discard_stalled(&mut self) { + self.queue.lock().unwrap().clear(); + } + + /// Waits for all currently running threads and all stalled closures to be executed. + /// If any closure hasn't been executed yet, `join_all` will wait until the queue holding all + /// unexecuted closures is empty. It returns the result every `join` of all threads yields as a vector. + /// If the vector is of length zero, no threads were joined and the thread pool didn't do anything. + /// All handles of threads will be removed after this call. + pub fn join_all(&mut self) -> Vec>> { + let mut results = Vec::new(); + loop { + // lock the handles, pop the last one off and unlock handles again + // to allow running threads to process + let handle = self.handles.lock().unwrap().pop(); + + // if we still have a handle join it else no handles are left we abort the loop + if let Some(handle) = handle { + results.push(handle.join()); + continue; + } + break; + } + + results + } + + /// Returns the results of every thread that has already finished until now. + /// All other threads currently running won't be waited for nor for any closure stalled for execution in the future. + /// /// If the vector is of length zero, no threads were joined and the thread pool either doesn't do anything or is busy. + /// All handles of finished threads will be removed after this call. + pub fn get_finished(&mut self) -> Vec>> { + let mut results = Vec::new(); + + let mut handles = self.handles.lock().unwrap(); + + // loop through the handles and remove all finished handles + // join on the finished handles which will be quick as they are finished! + let mut idx = 0; + while idx < handles.len() { + if handles[idx].is_finished() { + // thread is finished, yield result + results.push(handles.remove(idx).join()); + } else { + // thread isn't done, continue to the next one + idx += 1; + } + } + + results + } + + /// set the flag to indicate that thread handles will be dropped after the thread is finished + /// executing. All threads that have finished until now but haven't been removed will get dropped + /// after the next thread finishes. + pub fn drop_finished_handles(&self) { + self.drop_handles.store(false, Ordering::Release); + } + + /// set the flag to indicate that thread handles will be kept after the thread is finished + /// executing until either `join_all` or `get_finished` is called. + /// Only new thread handles created after this call be kept. + pub fn keep_future_handles(&self) { + self.drop_handles.store(true, Ordering::Release); + } +} + +/// Removes all thread handles which have finished only if the can be locked at +/// the current time. This function will not block execution when the lock cannot be acquired. +fn try_prune(handles: Arc>>>) { + if let Ok(mut handles) = handles.try_lock() { + // keep unfinished elements + handles.retain(|handle| !handle.is_finished()); + } +} + +/// Execute the supplied closure on a new thread +/// and store the threads handle into `handles`. When the thread +/// finished executing the closure it will look for any closures left in `queue` and +/// recursively execute it on a new thread. This method updates threads` in order to +/// keep track of the number of active threads. +fn execute( + queue: Arc>>, + handles: Arc>>>, + threads: Arc, + drop: Arc, + closure: F, +) where + T: Send + 'static, + F: Send + FnOnce() -> T + 'static, +{ + let handles_copy = handles.clone(); + + handles.lock().unwrap().push(thread::spawn(move || { + // run closure (actual work) + let result = closure(); + + // take the next closure stalled for execution + let next = queue.lock().unwrap().pop_front(); + if let Some(next_closure) = next { + // if we have sth. to execute, spawn a new thread + execute( + queue, + handles_copy.clone(), + threads, + drop.clone(), + next_closure, + ); + } else { + // nothing to execute this thread will run out without any work to do + // decrement the amount of used threads + threads.store( + threads.load(Ordering::Acquire).saturating_sub(1), + Ordering::Release, + ) + } + + // try to drop all fnished thread handles if necessary + // this is a non blocking operation + if drop.load(Ordering::Acquire) { + try_prune(handles_copy); + } + + result + })); +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_thread_pool() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::new(); + + // launch 4 jobs to run on our pool + for i in 0..4 { + pool.enqueue(move || (0..=i).sum::()); + } + + // wait for the threads to finish and sum their results + let sum = pool + .join_all() + .into_iter() + .map(|r| r.unwrap()) + .sum::(); + + assert_eq!(sum, 10); + } + + #[test] + fn test_drop_stalled() { + // auto determine the amount of threads to use + let mut pool = ThreadPool::with_threads(NonZeroUsize::new(1).unwrap()); + + // launch 2 jobs: 1 will immediately return, the other one will sleep for 20 seconds + for i in 0..1 { + pool.enqueue(move || { + thread::sleep(Duration::from_secs(i * 20)); + i + }); + } + + // wait 10 secs + thread::sleep(Duration::from_secs(2)); + // discard job that should still run + pool.discard_stalled(); + + // wait for the threads to finish and sum their results + let sum = pool.join_all().into_iter().map(|r| r.unwrap()).sum::(); + + assert_eq!(sum, 0); + } +} From 334093ad877f6aae65c9474a91003f67a30e4332 Mon Sep 17 00:00:00 2001 From: teridax <72654954+Servostar@users.noreply.github.com> Date: Mon, 5 Jun 2023 17:40:00 +0000 Subject: [PATCH 2/2] Update README.md Added reminder for good programming practices for this repo (tb removed later on) --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 90d7ca9..f05ce35 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ # Programmentwurf Die Beschreibung der Aufgabenstellung ist unter [Programmentwurf.md](https://github.com/programmieren-mit-rust/programmentwurf/blob/main/Programmentwurf.md) zu finden. Diese `Readme.md` ist durch etwas Sinnvolles zu ersetzen. + +# WICHTIG! +Kleiner reminder, wenn ihr Sachen pushed in das repo, die eurer Anischt nach fertig sind (z.B für einen Pull-Request!), bitte mit den folgenden Commands auf Fehler/Warnings überprüfen: +- `cargo fmt` für formattierung +- `cargo clippy` für warnings +- `cargo test doc` für documentation tests +optional: +- `cargo test` für module tests +- `cargo bench` für benchmarks