diff --git a/.gitignore b/.gitignore index 98f2979..7cd589f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target /Cargo.lock .DS_Store +.idea /.vscode \ No newline at end of file diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 20ce75e..6e2eac9 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use imsearch::multithreading::ThreadPool; +use imsearch::multithreading::{Task, ThreadPool}; /// Amount of elements per vector used to calculate the dot product const VEC_ELEM_COUNT: usize = 1_000_000; @@ -37,7 +37,6 @@ fn dot(a: &[f64], b: &[f64]) -> f64 { /// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have /// finished the partial dot products will be summed to create the final result. fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { - let mut pool = ThreadPool::with_limit(threads); // number of elements in each vector for each thread @@ -46,17 +45,17 @@ fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { for i in 0..threads { // offset of the first element for the thread local vec let chunk = i * steps; - // create a new strong reference to the vector - let aa = a.clone(); - let bb = b.clone(); // launch a new thread - pool.enqueue(move || { - let a = &aa[chunk..(chunk + steps)]; - let b = &bb[chunk..(chunk + steps)]; - dot(a, b) - }); + pool.enqueue(Task::new( + (chunk, steps, a.clone(), b.clone()), + |(block, inc, a, b)| { + let a = &a[block..(block + inc)]; + let b = &b[block..(block + inc)]; + dot(a, b) + }, + )); } - + pool.join_all(); black_box(pool.get_results().iter().sum::()); @@ -116,15 +115,15 @@ fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { for i in 0..threads { // offset of the first element for the thread local vec let chunk = i * steps; - // create a new strong reference to the vector - let aa = a.clone(); - let bb = b.clone(); // launch a new thread - pool.enqueue(move || { - let a = &aa[chunk..(chunk + steps)]; - let b = &bb[chunk..(chunk + steps)]; - dot(a, b) - }); + pool.enqueue(Task::new( + (chunk, steps, a.clone(), b.clone()), + |(block, inc, a, b)| { + let a = &a[block..(block + inc)]; + let b = &b[block..(block + inc)]; + dot(a, b) + }, + )); } pool.join_all(); diff --git a/src/image/mod.rs b/src/image/mod.rs index 9414d03..685524b 100644 --- a/src/image/mod.rs +++ b/src/image/mod.rs @@ -21,22 +21,25 @@ //! ``` //! //! ``` -//! # use imsearch::image::Image; +//! # use std::path::{Path, PathBuf}; +//! use imsearch::image::Image; //! let vec:Vec<(u8,u8,u8,u8)> = vec![(135,32,255,79),(1,79,255,1),(79,1,32,1), //! (255,1,135,32),(79,32,255,1),(1,135,135,1), //! (1,1,1,255),(1,79,135,79),(32,1,79,1)]; -//! let mut image:Image = Image::new(3,3,vec); +//! let mut image:Image = Image::new(3,3,vec, PathBuf::new()); //! ``` //! use std::ops::{Index, IndexMut}; +use std::path::PathBuf; +use std::slice::{Iter, IterMut}; use std::vec::IntoIter; #[allow(unused)] #[derive(Default)] pub struct Image where - T: Into + PartialEq + Default + Copy, + T: Into + PartialEq + Default + Copy + From + PartialOrd, { ///the width of the Picture in px width: usize, @@ -44,16 +47,18 @@ where height: usize, ///the raw RGBA data of the Picture where the RGBA values of an pixel is one tuple pixels: Vec<(T, T, T, T)>, + ///the absolute path where the picture is located + path: PathBuf, } #[allow(unused)] impl Image where - T: Into + PartialEq + Default + Copy, + T: Into + PartialEq + Default + Copy + From + PartialOrd, { ///gives an Image with specified values if the Vec matches the width times the height of the Image /// if the width and height dont make sense for the Image then will this function panic. - pub fn new(width: usize, height: usize, pixels: Vec<(T, T, T, T)>) -> Self { + pub fn new(width: usize, height: usize, pixels: Vec<(T, T, T, T)>, path: PathBuf) -> Self { if width * height != pixels.len() { panic!("The Image does not have the same number of pixel as width and height implies") } else { @@ -61,27 +66,66 @@ where width, height, pixels, + path, } } } - /// Gives back the width of the image + /// Returns the width of the image pub fn width(&self) -> usize { self.width } - /// Gives back the height of the image + /// Returns the height of the image pub fn height(&self) -> usize { self.height } - /// Gives back a specified pixel of the image + /// Returns a specified pixel of the image pub fn pixel(&self, index: usize) -> (T, T, T, T) { *self.index(index) } + /// Returns the path of the image + pub fn path(&self) -> &PathBuf { + &self.path + } + + /// Returns the iterator of the pixels vector + pub fn iter(&self) -> Iter<'_, (T, T, T, T)> { + self.pixels.iter() + } + /// Returns the mutable iterator of the pixels vector + pub fn iter_mut(&mut self) -> IterMut<'_, (T, T, T, T)> { + self.pixels.iter_mut() + } + /// validates if every pixel of the Picture is between 0 and 255 using clamp(). + /// if not then the Value gets changed and the result returns true. + pub fn validate(&mut self) -> bool { + let mut result = false; + + for pixel in self.iter_mut() { + Image::clamp(&mut pixel.0, 255.into(), 0.into(), &mut result); + Image::clamp(&mut pixel.1, 255.into(), 0.into(), &mut result); + Image::clamp(&mut pixel.2, 255.into(), 0.into(), &mut result); + Image::clamp(&mut pixel.3, 255.into(), 0.into(), &mut result); + } + result + } + + /// validates if the given Value is between given min and max and changes it to min/ max if not. + ///result will be true if something is changed + fn clamp(pixel_color: &mut T, max: T, min: T, result: &mut bool) { + if *pixel_color > max { + *pixel_color = max; + *result = true; + } else if *pixel_color < min { + *pixel_color = min; + *result = true; + } + } } impl Index for Image where - T: Into + PartialEq + Default + Copy, + T: Into + PartialEq + Default + Copy + From + PartialOrd, { type Output = (T, T, T, T); fn index(&self, index: usize) -> &Self::Output { @@ -91,7 +135,7 @@ where impl IndexMut for Image where - T: Into + PartialEq + Default + Copy, + T: Into + PartialEq + Default + Copy + From + PartialOrd, { fn index_mut(&mut self, index: usize) -> &mut Self::Output { &mut self.pixels[index] @@ -100,7 +144,7 @@ where impl IntoIterator for Image where - T: Into + PartialEq + Default + Copy, + T: Into + PartialEq + Default + Copy + From + PartialOrd, { type Item = (T, T, T, T); type IntoIter = IntoIter; @@ -112,7 +156,6 @@ where #[cfg(test)] mod tests { - use super::*; #[test] @@ -135,7 +178,7 @@ mod tests { (1, 79, 135, 79), (32, 1, 79, 1), ]; - let mut image: Image = Image::new(3, 3, vec); + let mut image: Image = Image::new(3, 3, vec, PathBuf::new()); assert_eq!(*image.index(4), (79, 32, 255, 1)); let result = std::panic::catch_unwind(|| { @@ -162,5 +205,15 @@ mod tests { (32, 1, 79, 1) ] ); + + let vec: Vec<(f32, f32, f32, f32)> = vec![ + (-33.0, 7732.0, 2564355.0, -79.0), + (1.0, 79.0, 255.0, 1.05), + (300.0, 300.0, 300.0, 300.0), + ]; + let mut image: Image = Image::new(1, 3, vec, PathBuf::new()); + + assert!(image.validate()); + println!("{:?}", image.pixel(0)); } } diff --git a/src/lib.rs b/src/lib.rs index 35a42fb..2b26d05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ - extern crate core; pub mod image; diff --git a/src/multithreading/mod.rs b/src/multithreading/mod.rs index d20347a..f099cf5 100644 --- a/src/multithreading/mod.rs +++ b/src/multithreading/mod.rs @@ -8,10 +8,12 @@ //! # Example //! ```rust //! # use imsearch::multithreading::ThreadPool; +//! # use imsearch::multithreading::Task; //! let mut pool = ThreadPool::with_limit(2); //! //! for i in 0..10 { -//! pool.enqueue(move || i); +//! pool.enqueue(Task::new(i, |i| i)); +//! // ^^^^^^ closure or static function //! } //! //! pool.join_all(); @@ -19,6 +21,7 @@ //! ``` use std::{ + any::Any, collections::VecDeque, num::NonZeroUsize, sync::{ @@ -46,19 +49,31 @@ pub enum Priority { Low, } -/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are -/// free to execute them directly. They are meant to be executed only once and be done. -pub trait Job: Send + 'static + FnOnce() -> T +/// Traits a return value has to implement when being given back by a function or closure. +pub trait Sendable: Any + Send + 'static + std::panic::UnwindSafe {} + +impl Sendable for T where T: Any + Send + 'static + std::panic::UnwindSafe {} + +/// A task that will be executed at some point in the future by the thread pool +/// At the heart of this struct is the function to be executed. This may be a closure. +#[derive(Debug, Copy, Clone)] +pub struct Task where - T: Send, + I: Sendable, + T: Sendable, { + job: fn(I) -> T, + param: I, } -impl Job for U +impl Task where - U: Send + 'static + FnOnce() -> T, - T: Send + 'static, + I: Sendable, + T: Sendable, { + pub fn new(param: I, job: fn(I) -> T) -> Self { + Self { job, param } + } } /// Thread pool which can be used to execute functions or closures in parallel. @@ -71,23 +86,33 @@ where /// # Example /// ```rust /// # use imsearch::multithreading::ThreadPool; +/// # use imsearch::multithreading::Task; /// let mut pool = ThreadPool::with_limit(2); /// /// for i in 0..10 { -/// pool.enqueue(move || i); +/// pool.enqueue(Task::new(i, |i| i)); /// } /// /// pool.join_all(); /// assert_eq!(pool.get_results().iter().sum::(), 45); /// ``` +/// # Drop +/// This struct implements the `Drop` trait. Upon being dropped the pool will wait for all threads +/// to finsish. This may take up an arbitrary amount of time. +/// # Panics in the thread +/// When a function or closure panics, the executing thread will detect the unwind performed by `panic` causing the +/// thread to print a message on stderr. The thread itself captures panics and won't terminate execution but continue with +/// the next task in the queue. +/// Its not recommend to use this pool with custom panic hooks or special functions which abort the process. +/// Also panicking code from external program written in C++ or others in undefinied behavior according to [`std::panic::catch_unwind`] #[derive(Debug)] -pub struct ThreadPool +pub struct ThreadPool where - T: Send, - F: Job, + I: Sendable, + T: Sendable, { /// queue for storing the jobs to be executed - queue: Arc>>, + queue: Arc>>>, /// handles for all threads currently running and processing jobs handles: Vec>, /// reciver end for channel based communication between threads @@ -98,10 +123,10 @@ where limit: NonZeroUsize, } -impl Default for ThreadPool +impl Default for ThreadPool where - T: Send + 'static, - F: Job, + I: Sendable, + T: Sendable, { fn default() -> Self { let (sender, receiver) = channel::(); @@ -121,10 +146,10 @@ where } } -impl ThreadPool +impl ThreadPool where - T: Send + 'static, - F: Job, + I: Sendable, + T: Sendable, { /// Creates a new thread pool with default thread count determined by either /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. @@ -139,9 +164,13 @@ where /// # Panic /// This function will fails if `max_threads` is zero. pub fn with_limit(max_threads: usize) -> Self { + let (sender, receiver) = channel::(); Self { limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), - ..Default::default() + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + sender, + receiver, } } @@ -151,7 +180,7 @@ where /// This function will create a new thread if the maximum number of threads in not reached. /// In case the maximum number of threads is already used, the job is stalled and will get executed /// when a thread is ready and its at the start of the queue. - pub fn enqueue_priorize(&mut self, func: F, priority: Priority) { + pub fn enqueue_priorize(&mut self, func: Task, priority: Priority) { // put job into queue let mut queue = self.queue.lock().unwrap(); @@ -169,8 +198,14 @@ where let queue = self.queue.clone(); self.handles.push(thread::spawn(move || { - while let Some(job) = queue.lock().unwrap().pop_front() { - tx.send(job()).expect("cannot send result"); + while let Some(task) = queue.lock().unwrap().pop_front() { + // basically try catch + if let Err(e) = std::panic::catch_unwind(|| { + tx.send((task.job)(task.param)) + .expect("unable to send result over channel"); + }) { + eprintln!("thread paniced: {:?}", e); + } } })); } @@ -183,7 +218,7 @@ where /// This function will create a new thread if the maximum number of threads in not reached. /// In case the maximum number of threads is already used, the job is stalled and will get executed /// when a thread is ready and its at the start of the queue. - pub fn enqueue(&mut self, func: F) { + pub fn enqueue(&mut self, func: Task) { self.enqueue_priorize(func, Priority::Low); } @@ -196,15 +231,15 @@ where } } - /// Returns all results that have been returned by the threads until now + /// Sendables all results that have been Sendableed by the threads until now /// and haven't been consumed yet. - /// All results retrieved from this call won't be returned on a second call. + /// All results retrieved from this call won't be Sendableed on a second call. /// This function is non blocking. pub fn try_get_results(&mut self) -> Vec { self.receiver.try_iter().collect() } - /// Returns all results that have been returned by the threads until now + /// Sendables all results that have been returned by the threads until now /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). /// All results retrieved from this call won't be returned on a second call. /// This function will block the caller thread. @@ -214,8 +249,20 @@ where } } +impl Drop for ThreadPool +where + I: Sendable, + T: Sendable, +{ + fn drop(&mut self) { + self.join_all(); + } +} + #[cfg(test)] mod test { + use std::panic::UnwindSafe; + use super::*; #[test] @@ -223,7 +270,7 @@ mod test { let mut pool = ThreadPool::default(); for i in 0..10 { - pool.enqueue_priorize(move || i, Priority::High); + pool.enqueue_priorize(Task::new(i, |i| i), Priority::High); } pool.join_all(); @@ -236,7 +283,7 @@ mod test { let mut pool = ThreadPool::with_limit(2); for i in 0..10 { - pool.enqueue(move || i); + pool.enqueue(Task::new(i, |i| i)); } assert_eq!(pool.handles.len(), 2); @@ -247,19 +294,62 @@ mod test { assert_eq!(pool.get_results().iter().sum::(), 45); } + trait Object: Send + UnwindSafe { + fn get(&mut self) -> i32; + } + + #[derive(Default)] + struct Test1 { + _int: i32, + } + + impl Object for Test1 { + fn get(&mut self) -> i32 { + 0 + } + } + + #[derive(Default)] + struct Test2 { + _c: char, + } + + impl Object for Test2 { + fn get(&mut self) -> i32 { + 0 + } + } + + #[derive(Default)] + struct Test3 { + _s: String, + } + + impl Object for Test3 { + fn get(&mut self) -> i32 { + 0 + } + } + #[test] - fn test_multiple() { + fn test_1() { let mut pool = ThreadPool::with_limit(2); - for i in 0..10 { - pool.enqueue(move || i); - } + let feats: Vec> = vec![ + Box::new(Test1::default()), + Box::new(Test2::default()), + Box::new(Test3::default()), + ]; - assert_eq!(pool.handles.len(), 2); - assert_eq!(pool.limit.get(), 2); + for feat in feats { + pool.enqueue(Task::new(feat, |mut i| { + let _ = i.get(); + i + })); + } pool.join_all(); - assert_eq!(pool.get_results().iter().sum::(), 45); + let _feats: Vec> = pool.get_results(); } }