finished benchmark for threadpool

and fixed documentation for threadpool
This commit is contained in:
Sven Vogel 2023-05-31 17:09:44 +02:00
parent c732864a74
commit e16a38aeef
2 changed files with 139 additions and 27 deletions

View File

@ -1,8 +1,28 @@
use std::sync::Arc; //! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs)
//! This benchmark will compare the performance of various thread pools launched with different amounts of
//! maximum threads.
//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit
//! double precision floating point values.
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use std::{num::NonZeroUsize, sync::Arc};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use imsearch::multithreading::ThreadPool; use imsearch::multithreading::ThreadPool;
/// Amount of elements per vector used to calculate the dot product
const VEC_ELEM_COUNT: usize = 1_000_000;
/// Number of threads to test
const THREAD_COUNTS: [usize; 17] = [
1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64,
];
/// seeds used to scramble up the values produced by the hash function for each vector
/// these are just some pseudo random numbers
const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea];
/// Compute the dot product of two vectors
/// # Panics
/// this function assumes both vectors to be of exactly the same length.
/// If this is not the case the function will panic.
fn dot(a: &[f64], b: &[f64]) -> f64 { fn dot(a: &[f64], b: &[f64]) -> f64 {
let mut sum = 0.0; let mut sum = 0.0;
@ -13,21 +33,23 @@ fn dot(a: &[f64], b: &[f64]) -> f64 {
sum sum
} }
fn bench_single_threaded(a: &Vec<f64>, b: &Vec<f64>) { /// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally
black_box(dot(a, b)); /// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
} /// finished the partial dot products will be summed to create the final result.
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
let mut pool =
ThreadPool::with_threads_and_drop_handles(NonZeroUsize::new(threads).unwrap(), true);
fn bench_threadpool(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>) { // number of elements in each vector for each thread
let mut pool = ThreadPool::new(); let steps = a.len() / threads;
const CHUNKS: usize = 100; for i in 0..threads {
// offset of the first element for the thread local vec
let steps = a.len() / CHUNKS;
for i in 0..CHUNKS {
let chunk = i * steps; let chunk = i * steps;
// create a new strong reference to the vector
let aa = a.clone(); let aa = a.clone();
let bb = b.clone(); let bb = b.clone();
// launch a new thread
pool.enqueue(move || { pool.enqueue(move || {
let a = &aa[chunk..(chunk + steps)]; let a = &aa[chunk..(chunk + steps)];
let b = &bb[chunk..(chunk + steps)]; let b = &bb[chunk..(chunk + steps)];
@ -36,39 +58,129 @@ fn bench_threadpool(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>) {
} }
black_box( black_box(
// wait for the threads to finish
pool.join_all() pool.join_all()
// iterate over the results and sum the parital dot products together
.into_iter() .into_iter()
.map(|r| r.unwrap()) .map(|r| r.unwrap())
.reduce(|a, b| a + b), .reduce(|a, b| a + b),
); );
} }
/// Compute a simple hash value for the given index value.
/// This function will return a value between [0, 1].
#[inline] #[inline]
fn hash(x: f64) -> f64 { fn hash(x: f64) -> f64 {
((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract()
} }
fn create_vec(size: usize) -> Arc<Vec<f64>> { /// Create a vector filled with `size` elements of 64-bit floating point numbers
/// each initialized with the function `hash` and the given seed. The vector will
/// be filled with values between [0, 1].
fn create_vec(size: usize, seed: u64) -> Arc<Vec<f64>> {
let mut vec = Vec::with_capacity(size); let mut vec = Vec::with_capacity(size);
for i in 0..size { for i in 0..size {
vec.push(hash(i as f64)); vec.push(hash(i as f64 + seed as f64));
} }
Arc::new(vec) Arc::new(vec)
} }
pub fn benchmark_threadpool(c: &mut Criterion) { /// Function for executing the thread pool benchmarks using criterion.rs.
let vec_a = create_vec(1_000_000); /// It will create two different vectors and benchmark the single thread performance
let vec_b = create_vec(1_000_000); /// as well as the multi threadded performance for varying amounts of threads.
pub fn bench_threadpool(c: &mut Criterion) {
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
c.bench_function("single threaded", |b| { let mut group = c.benchmark_group("threadpool with various number of threads");
b.iter(|| bench_single_threaded(&vec_a, &vec_b))
}); for threads in THREAD_COUNTS.iter() {
c.bench_function("multi threaded", |b| { group.throughput(Throughput::Bytes(*threads as u64));
b.iter(|| bench_threadpool(vec_a.clone(), vec_b.clone())) group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
b.iter(|| {
dot_parallel(vec_a.clone(), vec_b.clone(), *threads);
});
});
}
group.finish();
}
/// Benchmark the effects of over and underusing a thread pools thread capacity.
/// The thread pool will automatically choose the number of threads to use.
/// We will then run a custom number of jobs with that pool that may be smaller or larger
/// than the amount of threads the pool can offer.
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
// automatically choose the number of threads
let mut pool = ThreadPool::new();
// drop the handles used by each thread after its done
pool.drop_finished_handles();
// number of elements in each vector for each thread
let steps = a.len() / threads;
for i in 0..threads {
// offset of the first element for the thread local vec
let chunk = i * steps;
// create a new strong reference to the vector
let aa = a.clone();
let bb = b.clone();
// launch a new thread
pool.enqueue(move || {
let a = &aa[chunk..(chunk + steps)];
let b = &bb[chunk..(chunk + steps)];
dot(a, b)
});
}
black_box(
// wait for the threads to finish
pool.join_all()
// iterate over the results and sum the parital dot products together
.into_iter()
.map(|r| r.unwrap())
.reduce(|a, b| a + b),
);
}
/// Benchmark the effects of over and underusing a thread pools thread capacity.
/// The thread pool will automatically choose the number of threads to use.
/// We will then run a custom number of jobs with that pool that may be smaller or larger
/// than the amount of threads the pool can offer.
pub fn bench_overusage(c: &mut Criterion) {
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
let mut group = c.benchmark_group("threadpool overusage");
for threads in THREAD_COUNTS.iter() {
group.throughput(Throughput::Bytes(*threads as u64));
group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
b.iter(|| {
pool_overusage(vec_a.clone(), vec_b.clone(), *threads);
});
});
}
group.finish();
}
/// Benchmark the performance of a single thread used to calculate the dot product.
pub fn bench_single_threaded(c: &mut Criterion) {
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
c.bench_function("single threaded", |s| {
s.iter(|| {
black_box(dot(&vec_a, &vec_b));
});
}); });
} }
criterion_group!(benches, benchmark_threadpool); criterion_group!(
benches,
bench_single_threaded,
bench_threadpool,
bench_overusage
);
criterion_main!(benches); criterion_main!(benches);

View File

@ -26,7 +26,7 @@
//! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. //! This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
//! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. //! This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
//! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" //! Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
//! (see: https://doc.rust-lang.org/std/sync/atomic/index.html). //! (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
//! Additionally this implementation relies on using the `load` and `store` operations //! Additionally this implementation relies on using the `load` and `store` operations
//! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls //! instead of using more comfortable ones like `fetch_add` in order to avoid unnecessary calls
//! to `unwrap` or `expected` from [`std::sync::MutexGuard`]. //! to `unwrap` or `expected` from [`std::sync::MutexGuard`].
@ -86,7 +86,7 @@ fn get_default_thread_count() -> usize {
/// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`]. /// This implementation is not fully platform independent. This is due to the usage of [`std::sync::atomic::AtomicUsize`].
/// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`]. /// This type is used to remove some locks from otherwise used [`std::sync::Mutex`] wrapping a [`usize`].
/// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing" /// Note that atomic primitives are not available on all platforms but "can generally be relied upon existing"
/// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). /// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
/// Additionally this implementation relies on using the `load` and `store` operations /// Additionally this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls /// instead of using more comfortable one like `fetch_add` in order to avoid unnecessary calls
/// to `unwrap` or `expected` from [`std::sync::MutexGuard`]. /// to `unwrap` or `expected` from [`std::sync::MutexGuard`].
@ -112,7 +112,7 @@ where
/// number of currently running threads /// number of currently running threads
/// new implementation relies on atomic primitives to avoid locking and possible /// new implementation relies on atomic primitives to avoid locking and possible
/// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing" /// guard errors. Note that atomic primitives are not available on all platforms "can generally be relied upon existing"
/// (see: https://doc.rust-lang.org/std/sync/atomic/index.html). /// (see: <https://doc.rust-lang.org/std/sync/atomic/index.html>).
/// Also this implementation relies on using the `load` and `store` operations /// Also this implementation relies on using the `load` and `store` operations
/// instead of using more comfortable one like `fetch_add` /// instead of using more comfortable one like `fetch_add`
threads: Arc<AtomicUsize>, threads: Arc<AtomicUsize>,
@ -173,7 +173,7 @@ where
/// supplying a number of threads to great may negatively impact performance as the system may not /// supplying a number of threads to great may negatively impact performance as the system may not
/// be able to full fill the required needs /// be able to full fill the required needs
/// # Memory usage /// # Memory usage
/// if `drop_handles` is set to [`Bool::false`] the pool will continue to store the handles of /// if `drop_handles` is set to `false` the pool will continue to store the handles of
/// launched threads. This causes memory consumption to rise over time as more and more /// launched threads. This causes memory consumption to rise over time as more and more
/// threads are launched. /// threads are launched.
pub fn with_threads_and_drop_handles( pub fn with_threads_and_drop_handles(