From c732864a741f2474e79d315cc3f5e835379051f2 Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 31 May 2023 09:00:49 +0200 Subject: [PATCH 1/2] added benchmark for `threadpool` using `criterion`. --- .gitignore | 1 + Cargo.toml | 7 ++++ benches/multithreading.rs | 74 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 benches/multithreading.rs diff --git a/.gitignore b/.gitignore index 4c790d0..98f2979 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Cargo.lock .DS_Store +/.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a6d8ad6..69cd0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"] png = "0.17.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "multithreading" +harness = false \ No newline at end of file diff --git a/benches/multithreading.rs b/benches/multithreading.rs new file mode 100644 index 0000000..ffd36d8 --- /dev/null +++ b/benches/multithreading.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use imsearch::multithreading::ThreadPool; + +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum +} + +fn bench_single_threaded(a: &Vec, b: &Vec) { + black_box(dot(a, b)); +} + +fn bench_threadpool(a: Arc>, b: Arc>) { + let mut pool = ThreadPool::new(); + + const CHUNKS: usize = 100; + + let steps = a.len() / CHUNKS; + + for i in 0..CHUNKS { + let chunk = i * steps; + let aa = a.clone(); + let bb = b.clone(); + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + pool.join_all() + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() +} + +fn create_vec(size: usize) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64)); + } + + Arc::new(vec) +} + +pub fn benchmark_threadpool(c: &mut Criterion) { + let vec_a = create_vec(1_000_000); + let vec_b = create_vec(1_000_000); + + c.bench_function("single threaded", |b| { + b.iter(|| bench_single_threaded(&vec_a, &vec_b)) + }); + c.bench_function("multi threaded", |b| { + b.iter(|| bench_threadpool(vec_a.clone(), vec_b.clone())) + }); +} + +criterion_group!(benches, benchmark_threadpool); +criterion_main!(benches); From e16a38aeefb5fb6ae04d6c59c7cb9f549bde7af8 Mon Sep 17 00:00:00 2001 From: teridax Date: Wed, 31 May 2023 17:09:44 +0200 Subject: [PATCH 2/2] finished benchmark for threadpool and fixed documentation for threadpool --- benches/multithreading.rs | 158 ++++++++++++++++++++++++++++++++------ src/multithreading/mod.rs | 8 +- 2 files changed, 139 insertions(+), 27 deletions(-) diff --git a/benches/multithreading.rs b/benches/multithreading.rs index ffd36d8..84a6014 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -1,8 +1,28 @@ -use std::sync::Arc; +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::{num::NonZeroUsize, sync::Arc}; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use imsearch::multithreading::ThreadPool; +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. fn dot(a: &[f64], b: &[f64]) -> f64 { let mut sum = 0.0; @@ -13,21 +33,23 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { sum } -fn bench_single_threaded(a: &Vec, b: &Vec) { - black_box(dot(a, b)); -} +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + let mut pool = + ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true); -fn bench_threadpool(a: Arc>, b: Arc>) { - let mut pool = ThreadPool::new(); + // number of elements in each vector for each thread + let steps = a.len() / threads; - const CHUNKS: usize = 100; - - let steps = a.len() / CHUNKS; - - for i in 0..CHUNKS { + for i in 0..threads { + // offset of the first element for the thread local vec let chunk = i * steps; + // create a new strong reference to the vector let aa = a.clone(); let bb = b.clone(); + // launch a new thread pool.enqueue(move || { let a = &aa[chunk..(chunk + steps)]; let b = &bb[chunk..(chunk + steps)]; @@ -36,39 +58,129 @@ fn bench_threadpool(a: Arc>, b: Arc>) { } black_box( + // wait for the threads to finish pool.join_all() + // iterate over the results and sum the parital dot products together .into_iter() .map(|r| r.unwrap()) .reduce(|a, b| a + b), ); } +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. #[inline] fn hash(x: f64) -> f64 { ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() } -fn create_vec(size: usize) -> Arc> { +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { let mut vec = Vec::with_capacity(size); for i in 0..size { - vec.push(hash(i as f64)); + vec.push(hash(i as f64 + seed as f64)); } Arc::new(vec) } -pub fn benchmark_threadpool(c: &mut Criterion) { - let vec_a = create_vec(1_000_000); - let vec_b = create_vec(1_000_000); +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - c.bench_function("single threaded", |b| { - b.iter(|| bench_single_threaded(&vec_a, &vec_b)) - }); - c.bench_function("multi threaded", |b| { - b.iter(|| bench_threadpool(vec_a.clone(), vec_b.clone())) + let mut group = c.benchmark_group("threadpool with various number of threads"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + // drop the handles used by each thread after its done + pool.drop_finished_handles(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(move || { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }); + } + + black_box( + // wait for the threads to finish + pool.join_all() + // iterate over the results and sum the parital dot products together + .into_iter() + .map(|r| r.unwrap()) + .reduce(|a, b| a + b), + ); +} + +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); + } + group.finish(); +} + +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); }); } -criterion_group!(benches, benchmark_threadpool); +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); criterion_main!(benches); diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index 71dabb6..647ea48 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -26,7 +26,7 @@ //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. //! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -//! (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +//! (see: ). //! Additionally this implementation relies on using the `load` and `store` operations //! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls //! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. @@ -86,7 +86,7 @@ fn get_default_thread_count() -> usize { /// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. /// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. /// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" -/// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). +/// (see: ). /// Additionally this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. @@ -112,7 +112,7 @@ where /// number of currently running threads /// new implementation relies on atomic primitives to avoid locking and possible /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" - /// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). + /// (see: ). /// Also this implementation relies on using the `load` and `store` operations /// instead of using more comfortable one like `fetch_add` threads: Arc, @@ -173,7 +173,7 @@ where /// supplying a number of threads to great may negatively impact performance as the system may not /// be able to full fill the required needs /// # Memory usage - /// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of + /// if `drop_handles` is set to `false` the pool will continue to store the handles of /// launched threads. This causes memory consumption to rise over time as more and more /// threads are launched. pub fn with_threads_and_drop_handles(