diff --git a/.gitignore b/.gitignore index 4c790d0..98f2979 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock .DS_Store +/.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a6d8ad6..69cd0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"] png = "0.17.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "multithreading" +harness = false \ No newline at end of file diff --git a/README.md b/README.md index 90d7ca9..f05ce35 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ # Programmentwurf Die Beschreibung der Aufgabenstellung ist unter [Programmentwurf.md](https://github.com/programmieren-mit-rust/programmentwurf/blob/main/Programmentwurf.md) zu finden. Diese `Readme.md` ist durch etwas Sinnvolles zu ersetzen. + +# WICHTIG! +Kleiner reminder, wenn ihr Sachen pushed in das repo, die eurer Anischt nach fertig sind (z.B für einen Pull-Request!), bitte mit den folgenden Commands auf Fehler/Warnings überprüfen: +- `cargo fmt` für formattierung +- `cargo clippy` für warnings +- `cargo test doc` für documentation tests +optional: +- `cargo test` für module tests +- `cargo bench` für benchmarks diff --git a/benches/multithreading.rs b/benches/multithreading.rs new file mode 100644 index 0000000..20ce75e --- /dev/null +++ b/benches/multithreading.rs @@ -0,0 +1,174 @@ +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. + +use std::sync::Arc; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use imsearch::multithreading::ThreadPool; + +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum +} + +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + + let mut pool = ThreadPool::with_limit(threads); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); +} + +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() +} + +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64 + seed as f64)); + } + + Arc::new(vec) +} + +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool with various number of threads"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); + }); +} + +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 517875c..35a42fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ + extern crate core; pub mod image; +pub mod multithreading; pub fn add(left: usize, right: usize) -> usize { left + right diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs new file mode 100644 index 0000000..d20347a --- /dev/null +++ b/src/multithreading/mod.rs @@ -0,0 +1,265 @@ +//! This module provides the functionality to create thread pool to execute tasks in parallel. +//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +//! a new thread will be launched until the maximum number is reached. By then every launched thread will +//! be reused to process the remaining elements of the queue. If no jobs are left to be executed +//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +//! # Example +//! ```rust +//! # use imsearch::multithreading::ThreadPool; +//! let mut pool = ThreadPool::with_limit(2); +//! +//! for i in 0..10 { +//! pool.enqueue(move || i); +//! } +//! +//! pool.join_all(); +//! assert_eq!(pool.get_results().iter().sum::(), 45); +//! ``` + +use std::{ + collections::VecDeque, + num::NonZeroUsize, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, +}; + +/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails. +pub const DEFAULT_THREAD_POOL_SIZE: usize = 1; + +/// Indicates the priority level of functions or closures which get supplied to the pool. +/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied +/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum Priority { + /// Indicate that the closure or function supplied to the thread + /// has higher priority than any other given to the pool until now. + /// The item will get enqueued at the start of the waiting-queue. + High, + /// Indicate that the closure or function supplied to the thread pool + /// has lower priority than the already supplied ones in this pool. + /// The item will get enqueued at the end of the waiting-queue. + Low, +} + +/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are +/// free to execute them directly. They are meant to be executed only once and be done. +pub trait Job: Send + 'static + FnOnce() -> T +where + T: Send, +{ +} + +impl Job for U +where + U: Send + 'static + FnOnce() -> T, + T: Send + 'static, +{ +} + +/// Thread pool which can be used to execute functions or closures in parallel. +/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +/// a new thread will be launched until the maximum number is reached. By then every launched thread will +/// be reused to process the remaining elements of the queue. If no jobs are left to be executed +/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +/// # Example +/// ```rust +/// # use imsearch::multithreading::ThreadPool; +/// let mut pool = ThreadPool::with_limit(2); +/// +/// for i in 0..10 { +/// pool.enqueue(move || i); +/// } +/// +/// pool.join_all(); +/// assert_eq!(pool.get_results().iter().sum::(), 45); +/// ``` +#[derive(Debug)] +pub struct ThreadPool +where + T: Send, + F: Job, +{ + /// queue for storing the jobs to be executed + queue: Arc>>, + /// handles for all threads currently running and processing jobs + handles: Vec>, + /// reciver end for channel based communication between threads + receiver: Receiver, + /// sender end for channel based communication between threads + sender: Sender, + /// maximum amount of threads to be used in parallel + limit: NonZeroUsize, +} + +impl Default for ThreadPool +where + T: Send + 'static, + F: Job, +{ + fn default() -> Self { + let (sender, receiver) = channel::(); + + // determine default thread count to use based on the system + let default = + NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + let limit = thread::available_parallelism().unwrap_or(default); + + Self { + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + receiver, + sender, + limit, + } + } +} + +impl ThreadPool +where + T: Send + 'static, + F: Job, +{ + /// Creates a new thread pool with default thread count determined by either + /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. + /// No threads will be lauched until jobs are enqueued. + pub fn new() -> Self { + Default::default() + } + + /// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if + /// the system does not allow for that count of parallelism. + /// No threads will be lauched until jobs are enqueued. + /// # Panic + /// This function will fails if `max_threads` is zero. + pub fn with_limit(max_threads: usize) -> Self { + Self { + limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), + ..Default::default() + } + } + + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job will determine if the job will be put at the start or end of the queue. + /// See [`crate::multithreading::Priority`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue_priorize(&mut self, func: F, priority: Priority) { + // put job into queue + let mut queue = self.queue.lock().unwrap(); + + // insert new job into queue depending on its priority + match priority { + Priority::High => queue.push_front(func), + Priority::Low => queue.push_back(func), + } + + if self.handles.len() < self.limit.get() { + // we can still launch threads to run in parallel + + // clone the sender + let tx = self.sender.clone(); + let queue = self.queue.clone(); + + self.handles.push(thread::spawn(move || { + while let Some(job) = queue.lock().unwrap().pop_front() { + tx.send(job()).expect("cannot send result"); + } + })); + } + + self.handles.retain(|h| !h.is_finished()); + } + + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue(&mut self, func: F) { + self.enqueue_priorize(func, Priority::Low); + } + + /// Wait for all threads to finish executing. This means that by the time all threads have finished + /// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty. + /// This function will block the caller thread. + pub fn join_all(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); + } + } + + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. + /// All results retrieved from this call won't be returned on a second call. + /// This function is non blocking. + pub fn try_get_results(&mut self) -> Vec { + self.receiver.try_iter().collect() + } + + /// Returns all results that have been returned by the threads until now + /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). + /// All results retrieved from this call won't be returned on a second call. + /// This function will block the caller thread. + pub fn get_results(&mut self) -> Vec { + self.join_all(); + self.try_get_results() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_default() { + let mut pool = ThreadPool::default(); + + for i in 0..10 { + pool.enqueue_priorize(move || i, Priority::High); + } + + pool.join_all(); + + assert_eq!(pool.try_get_results().iter().sum::(), 45); + } + + #[test] + fn test_limit() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(move || i); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); + } + + #[test] + fn test_multiple() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(move || i); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); + } +}