diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 0591bab..0119b93 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -1,350 +1,179 @@ -//! This module provides the functionality to create thread pool to execute tasks in parallel. -//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. -//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. -//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job -//! a new thread will be launched until the maximum number is reached. By then every launched thread will -//! be reused to process the remaining elements of the queue. If no jobs are left to be executed -//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. -//! # Example -//! ```rust -//! # use imsearch::multithreading::ThreadPool; -//! # use imsearch::multithreading::Task; -//! let mut pool = ThreadPool::with_limit(2); -//! -//! for i in 0..10 { -//! pool.enqueue(Task::new(i, |i| i)); -//! // ^^^^^^ closure or static function -//! } -//! -//! pool.join_all(); -//! assert_eq!(pool.get_results().iter().sum::(), 45); -//! ``` +//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) +//! This benchmark will compare the performance of various thread pools launched with different amounts of +//! maximum threads. +//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit +//! double precision floating point values. -use std::{ - any::Any, - collections::VecDeque, - num::NonZeroUsize, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, - thread::{self, JoinHandle}, -}; +use std::sync::Arc; -/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails. -pub const DEFAULT_THREAD_POOL_SIZE: usize = 1; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use imsearch::multithreading::{Task, ThreadPool}; -/// Indicates the priority level of functions or closures which get supplied to the pool. -/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied -/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub enum Priority { - /// Indicate that the closure or function supplied to the thread - /// has higher priority than any other given to the pool until now. - /// The item will get enqueued at the start of the waiting-queue. - High, - /// Indicate that the closure or function supplied to the thread pool - /// has lower priority than the already supplied ones in this pool. - /// The item will get enqueued at the end of the waiting-queue. - Low, +/// Amount of elements per vector used to calculate the dot product +const VEC_ELEM_COUNT: usize = 1_000_000; +/// Number of threads to test +const THREAD_COUNTS: [usize; 17] = [ + 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, +]; +/// seeds used to scramble up the values produced by the hash function for each vector +/// these are just some pseudo random numbers +const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; + +/// Compute the dot product of two vectors +/// # Panics +/// this function assumes both vectors to be of exactly the same length. +/// If this is not the case the function will panic. +fn dot(a: &[f64], b: &[f64]) -> f64 { + let mut sum = 0.0; + + for i in 0..a.len() { + sum += a[i] * b[i]; + } + + sum } -/// Traits a return value has to implement when being given back by a function or closure. -pub trait Sendable: Any + Send + 'static {} +/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally +/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have +/// finished the partial dot products will be summed to create the final result. +fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { + let mut pool = ThreadPool::with_limit(threads); -impl Sendable for T where T: Any + Send + 'static {} + // number of elements in each vector for each thread + let steps = a.len() / threads; -/// A task that will be executed at some point in the future by the thread pool -/// At the heart of this struct is the function to be executed. This may be a closure. -#[derive(Debug, Copy, Clone)] -pub struct Task -where - I: Sendable, - T: Sendable, -{ - job: fn(I) -> T, - param: I, + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(Task::new( + (aa, bb, chunk, steps), + |(aa, bb, chunk, steps)| { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }, + )); + } + + pool.join_all(); + + black_box(pool.get_results().iter().sum::()); } -impl Task -where - I: Sendable, - T: Sendable, -{ - pub fn new(param: I, job: fn(I) -> T) -> Self { - Self { job, param } - } +/// Compute a simple hash value for the given index value. +/// This function will return a value between [0, 1]. +#[inline] +fn hash(x: f64) -> f64 { + ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() } -/// Thread pool which can be used to execute functions or closures in parallel. -/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. -/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. -/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job -/// a new thread will be launched until the maximum number is reached. By then every launched thread will -/// be reused to process the remaining elements of the queue. If no jobs are left to be executed -/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. -/// # Example -/// ```rust -/// # use imsearch::multithreading::ThreadPool; -/// # use imsearch::multithreading::Task; -/// let mut pool = ThreadPool::with_limit(2); -/// -/// for i in 0..10 { -/// pool.enqueue(Task::new(i, |i| i)); -/// } -/// -/// pool.join_all(); -/// assert_eq!(pool.get_results().iter().sum::(), 45); -/// ``` -/// # Drop -/// This struct implements the `Drop` trait. Upon being dropped the pool will wait for all threads -/// to finsish. This may take up an arbitrary amount of time. -/// # Panics in the thread -/// When a function or closure panics, the executing thread will detect the unwind performed by `panic` causing the -/// thread to print a message on stderr. The thread itself captures panics and won't terminate execution but continue with -/// the next task in the queue. -/// Its not recommend to use this pool with custom panic hooks or special functions which abort the process. -/// Also panicking code from external program written in C++ or others in undefinied behavior according to [`std::panic::catch_unwind`] -#[derive(Debug)] -pub struct ThreadPool -where - I: Sendable, - T: Sendable, -{ - /// queue for storing the jobs to be executed - queue: Arc>>>, - /// handles for all threads currently running and processing jobs - handles: Vec>, - /// reciver end for channel based communication between threads - receiver: Receiver, - /// sender end for channel based communication between threads - sender: Sender, - /// maximum amount of threads to be used in parallel - limit: NonZeroUsize, +/// Create a vector filled with `size` elements of 64-bit floating point numbers +/// each initialized with the function `hash` and the given seed. The vector will +/// be filled with values between [0, 1]. +fn create_vec(size: usize, seed: u64) -> Arc> { + let mut vec = Vec::with_capacity(size); + + for i in 0..size { + vec.push(hash(i as f64 + seed as f64)); + } + + Arc::new(vec) } -impl Default for ThreadPool -where - I: Sendable, - T: Sendable, -{ - fn default() -> Self { - let (sender, receiver) = channel::(); +/// Function for executing the thread pool benchmarks using criterion.rs. +/// It will create two different vectors and benchmark the single thread performance +/// as well as the multi threadded performance for varying amounts of threads. +pub fn bench_threadpool(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - // determine default thread count to use based on the system - let default = - NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); - let limit = thread::available_parallelism().unwrap_or(default); + let mut group = c.benchmark_group("threadpool with various number of threads"); - Self { - queue: Arc::new(Mutex::new(VecDeque::new())), - handles: Vec::new(), - receiver, - sender, - limit, - } + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + dot_parallel(vec_a.clone(), vec_b.clone(), *threads); + }); + }); } + group.finish(); } -impl ThreadPool -where - I: Sendable, - T: Sendable, -{ - /// Creates a new thread pool with default thread count determined by either - /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. - /// No threads will be lauched until jobs are enqueued. - pub fn new() -> Self { - Default::default() +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { + // automatically choose the number of threads + let mut pool = ThreadPool::new(); + + // number of elements in each vector for each thread + let steps = a.len() / threads; + + for i in 0..threads { + // offset of the first element for the thread local vec + let chunk = i * steps; + // create a new strong reference to the vector + let aa = a.clone(); + let bb = b.clone(); + // launch a new thread + pool.enqueue(Task::new( + (aa, bb, chunk, steps), + |(aa, bb, chunk, steps)| { + let a = &aa[chunk..(chunk + steps)]; + let b = &bb[chunk..(chunk + steps)]; + dot(a, b) + }, + )); } - /// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if - /// the system does not allow for that count of parallelism. - /// No threads will be lauched until jobs are enqueued. - /// # Panic - /// This function will fails if `max_threads` is zero. - pub fn with_limit(max_threads: usize) -> Self { - let (sender, receiver) = channel::(); - Self { - limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), - queue: Arc::new(Mutex::new(VecDeque::new())), - handles: Vec::new(), - sender, - receiver, - } - } + pool.join_all(); - /// Put a new job into the queue to be executed by a thread in the future. - /// The priority of the job will determine if the job will be put at the start or end of the queue. - /// See [`crate::multithreading::Priority`]. - /// This function will create a new thread if the maximum number of threads in not reached. - /// In case the maximum number of threads is already used, the job is stalled and will get executed - /// when a thread is ready and its at the start of the queue. - pub fn enqueue_priorize(&mut self, func: Task, priority: Priority) { - // put job into queue - let mut queue = self.queue.lock().unwrap(); - - // insert new job into queue depending on its priority - match priority { - Priority::High => queue.push_front(func), - Priority::Low => queue.push_back(func), - } - - if self.handles.len() < self.limit.get() { - // we can still launch threads to run in parallel - - // clone the sender - let tx = self.sender.clone(); - let queue = self.queue.clone(); - - self.handles.push(thread::spawn(move || { - while let Some(task) = queue.lock().unwrap().pop_front() { - tx.send((task.job)(task.param)) - .expect("unable to send result over channel"); - } - })); - } - - self.handles.retain(|h| !h.is_finished()); - } - - /// Put a new job into the queue to be executed by a thread in the future. - /// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`]. - /// This function will create a new thread if the maximum number of threads in not reached. - /// In case the maximum number of threads is already used, the job is stalled and will get executed - /// when a thread is ready and its at the start of the queue. - pub fn enqueue(&mut self, func: Task) { - self.enqueue_priorize(func, Priority::Low); - } - - /// Wait for all threads to finish executing. This means that by the time all threads have finished - /// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty. - /// This function will block the caller thread. - pub fn join_all(&mut self) { - while let Some(handle) = self.handles.pop() { - handle.join().unwrap(); - } - } - - /// Sendables all results that have been Sendableed by the threads until now - /// and haven't been consumed yet. - /// All results retrieved from this call won't be Sendableed on a second call. - /// This function is non blocking. - pub fn try_get_results(&mut self) -> Vec { - self.receiver.try_iter().collect() - } - - /// Sendables all results that have been returned by the threads until now - /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). - /// All results retrieved from this call won't be returned on a second call. - /// This function will block the caller thread. - pub fn get_results(&mut self) -> Vec { - self.join_all(); - self.try_get_results() - } + black_box(pool.get_results().iter().sum::()); } -impl Drop for ThreadPool -where - I: Sendable, - T: Sendable, -{ - fn drop(&mut self) { - self.join_all(); +/// Benchmark the effects of over and underusing a thread pools thread capacity. +/// The thread pool will automatically choose the number of threads to use. +/// We will then run a custom number of jobs with that pool that may be smaller or larger +/// than the amount of threads the pool can offer. +pub fn bench_overusage(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); + + let mut group = c.benchmark_group("threadpool overusage"); + + for threads in THREAD_COUNTS.iter() { + group.throughput(Throughput::Bytes(*threads as u64)); + group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { + b.iter(|| { + pool_overusage(vec_a.clone(), vec_b.clone(), *threads); + }); + }); } + group.finish(); } -#[cfg(test)] -mod test { - use std::panic::UnwindSafe; +/// Benchmark the performance of a single thread used to calculate the dot product. +pub fn bench_single_threaded(c: &mut Criterion) { + let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); + let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - use super::*; - - #[test] - fn test_default() { - let mut pool = ThreadPool::default(); - - for i in 0..10 { - pool.enqueue_priorize(Task::new(i, |i| i), Priority::High); - } - - pool.join_all(); - - assert_eq!(pool.try_get_results().iter().sum::(), 45); - } - - #[test] - fn test_limit() { - let mut pool = ThreadPool::with_limit(2); - - for i in 0..10 { - pool.enqueue(Task::new(i, |i| i)); - } - - assert_eq!(pool.handles.len(), 2); - assert_eq!(pool.limit.get(), 2); - - pool.join_all(); - - assert_eq!(pool.get_results().iter().sum::(), 45); - } - - trait Object: Send + UnwindSafe { - fn get(&mut self) -> i32; - } - - #[derive(Default)] - struct Test1 { - _int: i32, - } - - impl Object for Test1 { - fn get(&mut self) -> i32 { - 0 - } - } - - #[derive(Default)] - struct Test2 { - _c: char, - } - - impl Object for Test2 { - fn get(&mut self) -> i32 { - 0 - } - } - - #[derive(Default)] - struct Test3 { - _s: String, - } - - impl Object for Test3 { - fn get(&mut self) -> i32 { - 0 - } - } - - #[test] - fn test_1() { - let mut pool = ThreadPool::with_limit(2); - - let feats: Vec> = vec![ - Box::new(Test1::default()), - Box::new(Test2::default()), - Box::new(Test3::default()), - ]; - - for feat in feats { - pool.enqueue(Task::new(feat, |mut i| { - let _ = i.get(); - i - })); - } - - pool.join_all(); - - let _feats: Vec> = pool.get_results(); - } + c.bench_function("single threaded", |s| { + s.iter(|| { + black_box(dot(&vec_a, &vec_b)); + }); + }); } + +criterion_group!( + benches, + bench_single_threaded, + bench_threadpool, + bench_overusage +); +criterion_main!(benches); diff --git a/src/image/mod.rs b/src/image/mod.rs index 0626faa..ee6880d 100644 --- a/src/image/mod.rs +++ b/src/image/mod.rs @@ -88,16 +88,13 @@ where *self.index(index) } /// Returns all pixel of the image - pub fn pixels(&self, index: usize) -> &Vec<(T, T, T, T)> { + pub fn pixels(&self) -> &Vec<(T, T, T, T)> { &self.pixels } /// Returns the path of the image pub fn path(&self) -> &PathBuf { &self.path } - pub fn pixels(&self) -> &Vec<(T, T, T, T)> { - &self.pixels - } /// Returns the iterator of the pixels vector pub fn iter(&self) -> Iter<'_, (T, T, T, T)> { diff --git a/src/image_loader/mod.rs b/src/image_loader/mod.rs index c3625bd..829d589 100644 --- a/src/image_loader/mod.rs +++ b/src/image_loader/mod.rs @@ -41,14 +41,9 @@ pub fn image_loader(path: &Path) -> Result, &'static str> { let color_type = reader.info().color_type; let width = reader.info().width; let height = reader.info().height; - let palette = &reader.info().palette; let idat = &buf[..info.buffer_size()]; - println!("idat {:?} idat ", idat); - println!("palette {:?} palette", palette); - println!("depth {:?} depth", bit_depth); - let pixel_vec = match color_type { png::ColorType::Grayscale => grayscale_to_rgba(idat, bit_depth), png::ColorType::GrayscaleAlpha => grayscale_alpha_to_rgba(idat, bit_depth), @@ -57,12 +52,7 @@ pub fn image_loader(path: &Path) -> Result, &'static str> { _ => panic!("Unsupported color type or bit depth"), }?; - let image: Image = Image::new( - width as usize, - height as usize, - pixel_vec, - path.to_path_buf(), - ); + let image: Image = Image::new(width, height, pixel_vec, path.to_path_buf()); Ok(image) } diff --git a/src/lib.rs b/src/lib.rs index b8f0d44..178f861 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,18 +4,3 @@ pub mod image; pub mod image_loader; pub mod multithreading; pub mod search_index; - -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/src/search_index/mod.rs b/src/search_index/mod.rs index e5a08af..e9f0586 100644 --- a/src/search_index/mod.rs +++ b/src/search_index/mod.rs @@ -8,7 +8,7 @@ //! You also need a Vector of Feature generator functions that generates the feature of every image //! //! -//!``` +//!```rust ignore //! # use std::path::{Path, PathBuf}; //! # use imsearch::image::Image; //! # use imsearch::search_index; @@ -17,8 +17,8 @@ //! let path: Vec = Vec::new(); //! let features: Vec = Vec::new(); //! -//! let mut database = search_index::Database::new(&path, features ); -//! database.add_image(Path::new("testpath")); +//! let mut database = search_index::Database::new(&path, features).unwrap(); +//! database.add_image(Path::new("testpath")).unwrap(); //! ``` //! //! @@ -266,23 +266,34 @@ impl Database { ///This function search the Database after the Similarity to a given Image in a specific feature. /// It returns a Vector of all images and a f32 value which represents the Similarity in percent. /// - pub fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> { + pub fn search( + &self, + imagepath: &Path, + feature: FeatureGenerator, + ) -> Result, &'static str> { self.images.search(imagepath, feature) } ///the new function generates a new Database out of a vector of the Paths of the Images and a Vector of features - pub fn new(imagepaths: &Vec, features: Vec) -> Self { + pub fn new( + imagepaths: &Vec, + features: Vec, + ) -> Result { let mut threadpool = ThreadPool::new(); - Self { - images: IndexedImages::new(imagepaths, &features, &mut threadpool), + let images = match IndexedImages::new(imagepaths, &features, &mut threadpool) { + Ok(images) => images, + Err(e) => return Err(e), + }; + Ok(Self { + images, generators: features, threadpool, - } + }) } /// with add_image you can add images in a existing database. /// databases from a file are read only. - pub fn add_image(&mut self, path: &Path) { + pub fn add_image(&mut self, path: &Path) -> Result<(), &'static str> { if !self.generators.is_empty() { self.images .add_image(path, &self.generators, &mut self.threadpool) @@ -317,11 +328,14 @@ impl IndexedImages { imagepaths: &Vec, features: &[FeatureGenerator], threadpool: &mut ThreadPool>, (String, FeatureResult)>, - ) -> Self { + ) -> Result { let mut images_with_feats = HashMap::new(); for path in imagepaths { - let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + let image = match crate::image_loader::image_loader(path) { + Ok(image) => Arc::new(image), + Err(desc) => return Err(desc), + }; let mut feats = HashMap::new(); for generator in features.iter() { @@ -336,16 +350,23 @@ impl IndexedImages { images_with_feats.insert(image.path().clone(), feats); } - Self { + Ok(Self { images: images_with_feats, - } + }) } ///This function search the Database after the Similarity to a given Image in a specific feature. /// It returns a Vector of all images and a f32 value which represents the Similarity in percent. /// - fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> { - let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + fn search( + &self, + imagepath: &Path, + feature: FeatureGenerator, + ) -> Result, &'static str> { + let image = match crate::image_loader::image_loader(imagepath) { + Ok(image) => Arc::new(image), + Err(desc) => return Err(desc), + }; let search_feat = feature(image); let mut result: Vec<(PathBuf, f32)> = Vec::new(); @@ -356,7 +377,7 @@ impl IndexedImages { } } } - result + Ok(result) } ///this function lets you add images to the Indexed Image struct @@ -365,8 +386,11 @@ impl IndexedImages { path: &Path, generator: &Vec, threadpool: &mut ThreadPool>, (String, FeatureResult)>, - ) { - let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + ) -> Result<(), &'static str> { + let image = match crate::image_loader::image_loader(path) { + Ok(image) => Arc::new(image), + Err(desc) => return Err(desc), + }; let mut feats = HashMap::new(); for gen in generator { @@ -378,13 +402,9 @@ impl IndexedImages { feats.insert(name, result); } self.images.insert(image.path().clone(), feats); - } -} -/// example feature implementation -#[allow(dead_code)] -fn average_luminance(image: Arc>) -> (String, FeatureResult) { - (String::from("average-brightness"), FeatureResult::F32(0.0)) + Ok(()) + } } #[cfg(test)]