Merge branch 'main' into Image-Structs
This commit is contained in:
commit
c889c13aa4
|
@ -1,3 +1,4 @@
|
|||
/target
|
||||
/Cargo.lock
|
||||
.DS_Store
|
||||
/.vscode
|
|
@ -10,3 +10,10 @@ authors = ["Sven Vogel", "Felix L. Müller", "Elias Alexander", "Elias Schmidt"]
|
|||
png = "0.17.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.5.1"
|
||||
|
||||
[[bench]]
|
||||
name = "multithreading"
|
||||
harness = false
|
|
@ -1,3 +1,12 @@
|
|||
# Programmentwurf
|
||||
|
||||
Die Beschreibung der Aufgabenstellung ist unter [Programmentwurf.md](https://github.com/programmieren-mit-rust/programmentwurf/blob/main/Programmentwurf.md) zu finden. Diese `Readme.md` ist durch etwas Sinnvolles zu ersetzen.
|
||||
|
||||
# WICHTIG!
|
||||
Kleiner reminder, wenn ihr Sachen pushed in das repo, die eurer Anischt nach fertig sind (z.B für einen Pull-Request!), bitte mit den folgenden Commands auf Fehler/Warnings überprüfen:
|
||||
- `cargo fmt` für formattierung
|
||||
- `cargo clippy` für warnings
|
||||
- `cargo test doc` für documentation tests
|
||||
optional:
|
||||
- `cargo test` für module tests
|
||||
- `cargo bench` für benchmarks
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs)
|
||||
//! This benchmark will compare the performance of various thread pools launched with different amounts of
|
||||
//! maximum threads.
|
||||
//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit
|
||||
//! double precision floating point values.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||
use imsearch::multithreading::ThreadPool;
|
||||
|
||||
/// Amount of elements per vector used to calculate the dot product
|
||||
const VEC_ELEM_COUNT: usize = 1_000_000;
|
||||
/// Number of threads to test
|
||||
const THREAD_COUNTS: [usize; 17] = [
|
||||
1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64,
|
||||
];
|
||||
/// seeds used to scramble up the values produced by the hash function for each vector
|
||||
/// these are just some pseudo random numbers
|
||||
const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea];
|
||||
|
||||
/// Compute the dot product of two vectors
|
||||
/// # Panics
|
||||
/// this function assumes both vectors to be of exactly the same length.
|
||||
/// If this is not the case the function will panic.
|
||||
fn dot(a: &[f64], b: &[f64]) -> f64 {
|
||||
let mut sum = 0.0;
|
||||
|
||||
for i in 0..a.len() {
|
||||
sum += a[i] * b[i];
|
||||
}
|
||||
|
||||
sum
|
||||
}
|
||||
|
||||
/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally
|
||||
/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
|
||||
/// finished the partial dot products will be summed to create the final result.
|
||||
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||
|
||||
let mut pool = ThreadPool::with_limit(threads);
|
||||
|
||||
// number of elements in each vector for each thread
|
||||
let steps = a.len() / threads;
|
||||
|
||||
for i in 0..threads {
|
||||
// offset of the first element for the thread local vec
|
||||
let chunk = i * steps;
|
||||
// create a new strong reference to the vector
|
||||
let aa = a.clone();
|
||||
let bb = b.clone();
|
||||
// launch a new thread
|
||||
pool.enqueue(move || {
|
||||
let a = &aa[chunk..(chunk + steps)];
|
||||
let b = &bb[chunk..(chunk + steps)];
|
||||
dot(a, b)
|
||||
});
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
||||
black_box(pool.get_results().iter().sum::<f64>());
|
||||
}
|
||||
|
||||
/// Compute a simple hash value for the given index value.
|
||||
/// This function will return a value between [0, 1].
|
||||
#[inline]
|
||||
fn hash(x: f64) -> f64 {
|
||||
((x * 234.8743 + 3.8274).sin() * 87624.58376).fract()
|
||||
}
|
||||
|
||||
/// Create a vector filled with `size` elements of 64-bit floating point numbers
|
||||
/// each initialized with the function `hash` and the given seed. The vector will
|
||||
/// be filled with values between [0, 1].
|
||||
fn create_vec(size: usize, seed: u64) -> Arc<Vec<f64>> {
|
||||
let mut vec = Vec::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
vec.push(hash(i as f64 + seed as f64));
|
||||
}
|
||||
|
||||
Arc::new(vec)
|
||||
}
|
||||
|
||||
/// Function for executing the thread pool benchmarks using criterion.rs.
|
||||
/// It will create two different vectors and benchmark the single thread performance
|
||||
/// as well as the multi threadded performance for varying amounts of threads.
|
||||
pub fn bench_threadpool(c: &mut Criterion) {
|
||||
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||
|
||||
let mut group = c.benchmark_group("threadpool with various number of threads");
|
||||
|
||||
for threads in THREAD_COUNTS.iter() {
|
||||
group.throughput(Throughput::Bytes(*threads as u64));
|
||||
group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
|
||||
b.iter(|| {
|
||||
dot_parallel(vec_a.clone(), vec_b.clone(), *threads);
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
||||
/// The thread pool will automatically choose the number of threads to use.
|
||||
/// We will then run a custom number of jobs with that pool that may be smaller or larger
|
||||
/// than the amount of threads the pool can offer.
|
||||
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||
// automatically choose the number of threads
|
||||
let mut pool = ThreadPool::new();
|
||||
|
||||
// number of elements in each vector for each thread
|
||||
let steps = a.len() / threads;
|
||||
|
||||
for i in 0..threads {
|
||||
// offset of the first element for the thread local vec
|
||||
let chunk = i * steps;
|
||||
// create a new strong reference to the vector
|
||||
let aa = a.clone();
|
||||
let bb = b.clone();
|
||||
// launch a new thread
|
||||
pool.enqueue(move || {
|
||||
let a = &aa[chunk..(chunk + steps)];
|
||||
let b = &bb[chunk..(chunk + steps)];
|
||||
dot(a, b)
|
||||
});
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
||||
black_box(pool.get_results().iter().sum::<f64>());
|
||||
}
|
||||
|
||||
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
||||
/// The thread pool will automatically choose the number of threads to use.
|
||||
/// We will then run a custom number of jobs with that pool that may be smaller or larger
|
||||
/// than the amount of threads the pool can offer.
|
||||
pub fn bench_overusage(c: &mut Criterion) {
|
||||
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||
|
||||
let mut group = c.benchmark_group("threadpool overusage");
|
||||
|
||||
for threads in THREAD_COUNTS.iter() {
|
||||
group.throughput(Throughput::Bytes(*threads as u64));
|
||||
group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
|
||||
b.iter(|| {
|
||||
pool_overusage(vec_a.clone(), vec_b.clone(), *threads);
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
/// Benchmark the performance of a single thread used to calculate the dot product.
|
||||
pub fn bench_single_threaded(c: &mut Criterion) {
|
||||
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||
|
||||
c.bench_function("single threaded", |s| {
|
||||
s.iter(|| {
|
||||
black_box(dot(&vec_a, &vec_b));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_single_threaded,
|
||||
bench_threadpool,
|
||||
bench_overusage
|
||||
);
|
||||
criterion_main!(benches);
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
extern crate core;
|
||||
|
||||
pub mod image;
|
||||
pub mod multithreading;
|
||||
|
||||
pub fn add(left: usize, right: usize) -> usize {
|
||||
left + right
|
||||
|
|
|
@ -0,0 +1,265 @@
|
|||
//! This module provides the functionality to create thread pool to execute tasks in parallel.
|
||||
//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
||||
//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
||||
//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
||||
//! a new thread will be launched until the maximum number is reached. By then every launched thread will
|
||||
//! be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
||||
//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
||||
//! # Example
|
||||
//! ```rust
|
||||
//! # use imsearch::multithreading::ThreadPool;
|
||||
//! let mut pool = ThreadPool::with_limit(2);
|
||||
//!
|
||||
//! for i in 0..10 {
|
||||
//! pool.enqueue(move || i);
|
||||
//! }
|
||||
//!
|
||||
//! pool.join_all();
|
||||
//! assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
//! ```
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
num::NonZeroUsize,
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, Mutex,
|
||||
},
|
||||
thread::{self, JoinHandle},
|
||||
};
|
||||
|
||||
/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails.
|
||||
pub const DEFAULT_THREAD_POOL_SIZE: usize = 1;
|
||||
|
||||
/// Indicates the priority level of functions or closures which get supplied to the pool.
|
||||
/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied
|
||||
/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum Priority {
|
||||
/// Indicate that the closure or function supplied to the thread
|
||||
/// has higher priority than any other given to the pool until now.
|
||||
/// The item will get enqueued at the start of the waiting-queue.
|
||||
High,
|
||||
/// Indicate that the closure or function supplied to the thread pool
|
||||
/// has lower priority than the already supplied ones in this pool.
|
||||
/// The item will get enqueued at the end of the waiting-queue.
|
||||
Low,
|
||||
}
|
||||
|
||||
/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are
|
||||
/// free to execute them directly. They are meant to be executed only once and be done.
|
||||
pub trait Job<T>: Send + 'static + FnOnce() -> T
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
}
|
||||
|
||||
impl<U, T> Job<T> for U
|
||||
where
|
||||
U: Send + 'static + FnOnce() -> T,
|
||||
T: Send + 'static,
|
||||
{
|
||||
}
|
||||
|
||||
/// Thread pool which can be used to execute functions or closures in parallel.
|
||||
/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
||||
/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
||||
/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
||||
/// a new thread will be launched until the maximum number is reached. By then every launched thread will
|
||||
/// be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
||||
/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// # use imsearch::multithreading::ThreadPool;
|
||||
/// let mut pool = ThreadPool::with_limit(2);
|
||||
///
|
||||
/// for i in 0..10 {
|
||||
/// pool.enqueue(move || i);
|
||||
/// }
|
||||
///
|
||||
/// pool.join_all();
|
||||
/// assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPool<T, F>
|
||||
where
|
||||
T: Send,
|
||||
F: Job<T>,
|
||||
{
|
||||
/// queue for storing the jobs to be executed
|
||||
queue: Arc<Mutex<VecDeque<F>>>,
|
||||
/// handles for all threads currently running and processing jobs
|
||||
handles: Vec<JoinHandle<()>>,
|
||||
/// reciver end for channel based communication between threads
|
||||
receiver: Receiver<T>,
|
||||
/// sender end for channel based communication between threads
|
||||
sender: Sender<T>,
|
||||
/// maximum amount of threads to be used in parallel
|
||||
limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl<T, F> Default for ThreadPool<T, F>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Job<T>,
|
||||
{
|
||||
fn default() -> Self {
|
||||
let (sender, receiver) = channel::<T>();
|
||||
|
||||
// determine default thread count to use based on the system
|
||||
let default =
|
||||
NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero");
|
||||
let limit = thread::available_parallelism().unwrap_or(default);
|
||||
|
||||
Self {
|
||||
queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||
handles: Vec::new(),
|
||||
receiver,
|
||||
sender,
|
||||
limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F> ThreadPool<T, F>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Job<T>,
|
||||
{
|
||||
/// Creates a new thread pool with default thread count determined by either
|
||||
/// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails.
|
||||
/// No threads will be lauched until jobs are enqueued.
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
/// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if
|
||||
/// the system does not allow for that count of parallelism.
|
||||
/// No threads will be lauched until jobs are enqueued.
|
||||
/// # Panic
|
||||
/// This function will fails if `max_threads` is zero.
|
||||
pub fn with_limit(max_threads: usize) -> Self {
|
||||
Self {
|
||||
limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Put a new job into the queue to be executed by a thread in the future.
|
||||
/// The priority of the job will determine if the job will be put at the start or end of the queue.
|
||||
/// See [`crate::multithreading::Priority`].
|
||||
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||
/// when a thread is ready and its at the start of the queue.
|
||||
pub fn enqueue_priorize(&mut self, func: F, priority: Priority) {
|
||||
// put job into queue
|
||||
let mut queue = self.queue.lock().unwrap();
|
||||
|
||||
// insert new job into queue depending on its priority
|
||||
match priority {
|
||||
Priority::High => queue.push_front(func),
|
||||
Priority::Low => queue.push_back(func),
|
||||
}
|
||||
|
||||
if self.handles.len() < self.limit.get() {
|
||||
// we can still launch threads to run in parallel
|
||||
|
||||
// clone the sender
|
||||
let tx = self.sender.clone();
|
||||
let queue = self.queue.clone();
|
||||
|
||||
self.handles.push(thread::spawn(move || {
|
||||
while let Some(job) = queue.lock().unwrap().pop_front() {
|
||||
tx.send(job()).expect("cannot send result");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
self.handles.retain(|h| !h.is_finished());
|
||||
}
|
||||
|
||||
/// Put a new job into the queue to be executed by a thread in the future.
|
||||
/// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`].
|
||||
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||
/// when a thread is ready and its at the start of the queue.
|
||||
pub fn enqueue(&mut self, func: F) {
|
||||
self.enqueue_priorize(func, Priority::Low);
|
||||
}
|
||||
|
||||
/// Wait for all threads to finish executing. This means that by the time all threads have finished
|
||||
/// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty.
|
||||
/// This function will block the caller thread.
|
||||
pub fn join_all(&mut self) {
|
||||
while let Some(handle) = self.handles.pop() {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all results that have been returned by the threads until now
|
||||
/// and haven't been consumed yet.
|
||||
/// All results retrieved from this call won't be returned on a second call.
|
||||
/// This function is non blocking.
|
||||
pub fn try_get_results(&mut self) -> Vec<T> {
|
||||
self.receiver.try_iter().collect()
|
||||
}
|
||||
|
||||
/// Returns all results that have been returned by the threads until now
|
||||
/// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue).
|
||||
/// All results retrieved from this call won't be returned on a second call.
|
||||
/// This function will block the caller thread.
|
||||
pub fn get_results(&mut self) -> Vec<T> {
|
||||
self.join_all();
|
||||
self.try_get_results()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_default() {
|
||||
let mut pool = ThreadPool::default();
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue_priorize(move || i, Priority::High);
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
||||
assert_eq!(pool.try_get_results().iter().sum::<i32>(), 45);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_limit() {
|
||||
let mut pool = ThreadPool::with_limit(2);
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue(move || i);
|
||||
}
|
||||
|
||||
assert_eq!(pool.handles.len(), 2);
|
||||
assert_eq!(pool.limit.get(), 2);
|
||||
|
||||
pool.join_all();
|
||||
|
||||
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple() {
|
||||
let mut pool = ThreadPool::with_limit(2);
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue(move || i);
|
||||
}
|
||||
|
||||
assert_eq!(pool.handles.len(), 2);
|
||||
assert_eq!(pool.limit.get(), 2);
|
||||
|
||||
pool.join_all();
|
||||
|
||||
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue