commit
43ab902387
|
@ -1,4 +1,5 @@
|
|||
/target
|
||||
/Cargo.lock
|
||||
.DS_Store
|
||||
.idea
|
||||
/.vscode
|
|
@ -7,7 +7,7 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||
use imsearch::multithreading::ThreadPool;
|
||||
use imsearch::multithreading::{Task, ThreadPool};
|
||||
|
||||
/// Amount of elements per vector used to calculate the dot product
|
||||
const VEC_ELEM_COUNT: usize = 1_000_000;
|
||||
|
@ -37,7 +37,6 @@ fn dot(a: &[f64], b: &[f64]) -> f64 {
|
|||
/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
|
||||
/// finished the partial dot products will be summed to create the final result.
|
||||
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||
|
||||
let mut pool = ThreadPool::with_limit(threads);
|
||||
|
||||
// number of elements in each vector for each thread
|
||||
|
@ -46,15 +45,15 @@ fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
|||
for i in 0..threads {
|
||||
// offset of the first element for the thread local vec
|
||||
let chunk = i * steps;
|
||||
// create a new strong reference to the vector
|
||||
let aa = a.clone();
|
||||
let bb = b.clone();
|
||||
// launch a new thread
|
||||
pool.enqueue(move || {
|
||||
let a = &aa[chunk..(chunk + steps)];
|
||||
let b = &bb[chunk..(chunk + steps)];
|
||||
dot(a, b)
|
||||
});
|
||||
pool.enqueue(Task::new(
|
||||
(chunk, steps, a.clone(), b.clone()),
|
||||
|(block, inc, a, b)| {
|
||||
let a = &a[block..(block + inc)];
|
||||
let b = &b[block..(block + inc)];
|
||||
dot(a, b)
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
@ -116,15 +115,15 @@ fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
|||
for i in 0..threads {
|
||||
// offset of the first element for the thread local vec
|
||||
let chunk = i * steps;
|
||||
// create a new strong reference to the vector
|
||||
let aa = a.clone();
|
||||
let bb = b.clone();
|
||||
// launch a new thread
|
||||
pool.enqueue(move || {
|
||||
let a = &aa[chunk..(chunk + steps)];
|
||||
let b = &bb[chunk..(chunk + steps)];
|
||||
dot(a, b)
|
||||
});
|
||||
pool.enqueue(Task::new(
|
||||
(chunk, steps, a.clone(), b.clone()),
|
||||
|(block, inc, a, b)| {
|
||||
let a = &a[block..(block + inc)];
|
||||
let b = &b[block..(block + inc)];
|
||||
dot(a, b)
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
|
|
@ -21,22 +21,25 @@
|
|||
//! ```
|
||||
//!
|
||||
//! ```
|
||||
//! # use imsearch::image::Image;
|
||||
//! # use std::path::{Path, PathBuf};
|
||||
//! use imsearch::image::Image;
|
||||
//! let vec:Vec<(u8,u8,u8,u8)> = vec![(135,32,255,79),(1,79,255,1),(79,1,32,1),
|
||||
//! (255,1,135,32),(79,32,255,1),(1,135,135,1),
|
||||
//! (1,1,1,255),(1,79,135,79),(32,1,79,1)];
|
||||
//! let mut image:Image<u8> = Image::new(3,3,vec);
|
||||
//! let mut image:Image<u8> = Image::new(3,3,vec, PathBuf::new());
|
||||
//! ```
|
||||
//!
|
||||
|
||||
use std::ops::{Index, IndexMut};
|
||||
use std::path::PathBuf;
|
||||
use std::slice::{Iter, IterMut};
|
||||
use std::vec::IntoIter;
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Default)]
|
||||
pub struct Image<T>
|
||||
where
|
||||
T: Into<f32> + PartialEq + Default + Copy,
|
||||
T: Into<f32> + PartialEq + Default + Copy + From<u8> + PartialOrd,
|
||||
{
|
||||
///the width of the Picture in px
|
||||
width: usize,
|
||||
|
@ -44,16 +47,18 @@ where
|
|||
height: usize,
|
||||
///the raw RGBA data of the Picture where the RGBA values of an pixel is one tuple
|
||||
pixels: Vec<(T, T, T, T)>,
|
||||
///the absolute path where the picture is located
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
impl<T> Image<T>
|
||||
where
|
||||
T: Into<f32> + PartialEq + Default + Copy,
|
||||
T: Into<f32> + PartialEq + Default + Copy + From<u8> + PartialOrd,
|
||||
{
|
||||
///gives an Image with specified values if the Vec matches the width times the height of the Image
|
||||
/// if the width and height dont make sense for the Image then will this function panic.
|
||||
pub fn new(width: usize, height: usize, pixels: Vec<(T, T, T, T)>) -> Self {
|
||||
pub fn new(width: usize, height: usize, pixels: Vec<(T, T, T, T)>, path: PathBuf) -> Self {
|
||||
if width * height != pixels.len() {
|
||||
panic!("The Image does not have the same number of pixel as width and height implies")
|
||||
} else {
|
||||
|
@ -61,27 +66,66 @@ where
|
|||
width,
|
||||
height,
|
||||
pixels,
|
||||
path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Gives back the width of the image
|
||||
/// Returns the width of the image
|
||||
pub fn width(&self) -> usize {
|
||||
self.width
|
||||
}
|
||||
/// Gives back the height of the image
|
||||
/// Returns the height of the image
|
||||
pub fn height(&self) -> usize {
|
||||
self.height
|
||||
}
|
||||
/// Gives back a specified pixel of the image
|
||||
/// Returns a specified pixel of the image
|
||||
pub fn pixel(&self, index: usize) -> (T, T, T, T) {
|
||||
*self.index(index)
|
||||
}
|
||||
/// Returns the path of the image
|
||||
pub fn path(&self) -> &PathBuf {
|
||||
&self.path
|
||||
}
|
||||
|
||||
/// Returns the iterator of the pixels vector
|
||||
pub fn iter(&self) -> Iter<'_, (T, T, T, T)> {
|
||||
self.pixels.iter()
|
||||
}
|
||||
/// Returns the mutable iterator of the pixels vector
|
||||
pub fn iter_mut(&mut self) -> IterMut<'_, (T, T, T, T)> {
|
||||
self.pixels.iter_mut()
|
||||
}
|
||||
/// validates if every pixel of the Picture is between 0 and 255 using clamp().
|
||||
/// if not then the Value gets changed and the result returns true.
|
||||
pub fn validate(&mut self) -> bool {
|
||||
let mut result = false;
|
||||
|
||||
for pixel in self.iter_mut() {
|
||||
Image::clamp(&mut pixel.0, 255.into(), 0.into(), &mut result);
|
||||
Image::clamp(&mut pixel.1, 255.into(), 0.into(), &mut result);
|
||||
Image::clamp(&mut pixel.2, 255.into(), 0.into(), &mut result);
|
||||
Image::clamp(&mut pixel.3, 255.into(), 0.into(), &mut result);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// validates if the given Value is between given min and max and changes it to min/ max if not.
|
||||
///result will be true if something is changed
|
||||
fn clamp(pixel_color: &mut T, max: T, min: T, result: &mut bool) {
|
||||
if *pixel_color > max {
|
||||
*pixel_color = max;
|
||||
*result = true;
|
||||
} else if *pixel_color < min {
|
||||
*pixel_color = min;
|
||||
*result = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Index<usize> for Image<T>
|
||||
where
|
||||
T: Into<f32> + PartialEq + Default + Copy,
|
||||
T: Into<f32> + PartialEq + Default + Copy + From<u8> + PartialOrd,
|
||||
{
|
||||
type Output = (T, T, T, T);
|
||||
fn index(&self, index: usize) -> &Self::Output {
|
||||
|
@ -91,7 +135,7 @@ where
|
|||
|
||||
impl<T> IndexMut<usize> for Image<T>
|
||||
where
|
||||
T: Into<f32> + PartialEq + Default + Copy,
|
||||
T: Into<f32> + PartialEq + Default + Copy + From<u8> + PartialOrd,
|
||||
{
|
||||
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
|
||||
&mut self.pixels[index]
|
||||
|
@ -100,7 +144,7 @@ where
|
|||
|
||||
impl<T> IntoIterator for Image<T>
|
||||
where
|
||||
T: Into<f32> + PartialEq + Default + Copy,
|
||||
T: Into<f32> + PartialEq + Default + Copy + From<u8> + PartialOrd,
|
||||
{
|
||||
type Item = (T, T, T, T);
|
||||
type IntoIter = IntoIter<Self::Item>;
|
||||
|
@ -112,7 +156,6 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
|
@ -135,7 +178,7 @@ mod tests {
|
|||
(1, 79, 135, 79),
|
||||
(32, 1, 79, 1),
|
||||
];
|
||||
let mut image: Image<u8> = Image::new(3, 3, vec);
|
||||
let mut image: Image<u8> = Image::new(3, 3, vec, PathBuf::new());
|
||||
assert_eq!(*image.index(4), (79, 32, 255, 1));
|
||||
|
||||
let result = std::panic::catch_unwind(|| {
|
||||
|
@ -162,5 +205,15 @@ mod tests {
|
|||
(32, 1, 79, 1)
|
||||
]
|
||||
);
|
||||
|
||||
let vec: Vec<(f32, f32, f32, f32)> = vec![
|
||||
(-33.0, 7732.0, 2564355.0, -79.0),
|
||||
(1.0, 79.0, 255.0, 1.05),
|
||||
(300.0, 300.0, 300.0, 300.0),
|
||||
];
|
||||
let mut image: Image<f32> = Image::new(1, 3, vec, PathBuf::new());
|
||||
|
||||
assert!(image.validate());
|
||||
println!("{:?}", image.pixel(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
extern crate core;
|
||||
|
||||
pub mod image;
|
||||
|
|
|
@ -8,10 +8,12 @@
|
|||
//! # Example
|
||||
//! ```rust
|
||||
//! # use imsearch::multithreading::ThreadPool;
|
||||
//! # use imsearch::multithreading::Task;
|
||||
//! let mut pool = ThreadPool::with_limit(2);
|
||||
//!
|
||||
//! for i in 0..10 {
|
||||
//! pool.enqueue(move || i);
|
||||
//! pool.enqueue(Task::new(i, |i| i));
|
||||
//! // ^^^^^^ closure or static function
|
||||
//! }
|
||||
//!
|
||||
//! pool.join_all();
|
||||
|
@ -19,6 +21,7 @@
|
|||
//! ```
|
||||
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::VecDeque,
|
||||
num::NonZeroUsize,
|
||||
sync::{
|
||||
|
@ -46,19 +49,31 @@ pub enum Priority {
|
|||
Low,
|
||||
}
|
||||
|
||||
/// Jobs are functions which are executed by the thread pool. They can be stalled when no threads are
|
||||
/// free to execute them directly. They are meant to be executed only once and be done.
|
||||
pub trait Job<T>: Send + 'static + FnOnce() -> T
|
||||
/// Traits a return value has to implement when being given back by a function or closure.
|
||||
pub trait Sendable: Any + Send + 'static + std::panic::UnwindSafe {}
|
||||
|
||||
impl<T> Sendable for T where T: Any + Send + 'static + std::panic::UnwindSafe {}
|
||||
|
||||
/// A task that will be executed at some point in the future by the thread pool
|
||||
/// At the heart of this struct is the function to be executed. This may be a closure.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Task<I, T>
|
||||
where
|
||||
T: Send,
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
job: fn(I) -> T,
|
||||
param: I,
|
||||
}
|
||||
|
||||
impl<U, T> Job<T> for U
|
||||
impl<I, T> Task<I, T>
|
||||
where
|
||||
U: Send + 'static + FnOnce() -> T,
|
||||
T: Send + 'static,
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
pub fn new(param: I, job: fn(I) -> T) -> Self {
|
||||
Self { job, param }
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread pool which can be used to execute functions or closures in parallel.
|
||||
|
@ -71,23 +86,33 @@ where
|
|||
/// # Example
|
||||
/// ```rust
|
||||
/// # use imsearch::multithreading::ThreadPool;
|
||||
/// # use imsearch::multithreading::Task;
|
||||
/// let mut pool = ThreadPool::with_limit(2);
|
||||
///
|
||||
/// for i in 0..10 {
|
||||
/// pool.enqueue(move || i);
|
||||
/// pool.enqueue(Task::new(i, |i| i));
|
||||
/// }
|
||||
///
|
||||
/// pool.join_all();
|
||||
/// assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
/// ```
|
||||
/// # Drop
|
||||
/// This struct implements the `Drop` trait. Upon being dropped the pool will wait for all threads
|
||||
/// to finsish. This may take up an arbitrary amount of time.
|
||||
/// # Panics in the thread
|
||||
/// When a function or closure panics, the executing thread will detect the unwind performed by `panic` causing the
|
||||
/// thread to print a message on stderr. The thread itself captures panics and won't terminate execution but continue with
|
||||
/// the next task in the queue.
|
||||
/// Its not recommend to use this pool with custom panic hooks or special functions which abort the process.
|
||||
/// Also panicking code from external program written in C++ or others in undefinied behavior according to [`std::panic::catch_unwind`]
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPool<T, F>
|
||||
pub struct ThreadPool<I, T>
|
||||
where
|
||||
T: Send,
|
||||
F: Job<T>,
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
/// queue for storing the jobs to be executed
|
||||
queue: Arc<Mutex<VecDeque<F>>>,
|
||||
queue: Arc<Mutex<VecDeque<Task<I, T>>>>,
|
||||
/// handles for all threads currently running and processing jobs
|
||||
handles: Vec<JoinHandle<()>>,
|
||||
/// reciver end for channel based communication between threads
|
||||
|
@ -98,10 +123,10 @@ where
|
|||
limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl<T, F> Default for ThreadPool<T, F>
|
||||
impl<I, T> Default for ThreadPool<I, T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Job<T>,
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
fn default() -> Self {
|
||||
let (sender, receiver) = channel::<T>();
|
||||
|
@ -121,10 +146,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, F> ThreadPool<T, F>
|
||||
impl<I, T> ThreadPool<I, T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Job<T>,
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
/// Creates a new thread pool with default thread count determined by either
|
||||
/// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails.
|
||||
|
@ -139,9 +164,13 @@ where
|
|||
/// # Panic
|
||||
/// This function will fails if `max_threads` is zero.
|
||||
pub fn with_limit(max_threads: usize) -> Self {
|
||||
let (sender, receiver) = channel::<T>();
|
||||
Self {
|
||||
limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"),
|
||||
..Default::default()
|
||||
queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||
handles: Vec::new(),
|
||||
sender,
|
||||
receiver,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,7 +180,7 @@ where
|
|||
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||
/// when a thread is ready and its at the start of the queue.
|
||||
pub fn enqueue_priorize(&mut self, func: F, priority: Priority) {
|
||||
pub fn enqueue_priorize(&mut self, func: Task<I, T>, priority: Priority) {
|
||||
// put job into queue
|
||||
let mut queue = self.queue.lock().unwrap();
|
||||
|
||||
|
@ -169,8 +198,14 @@ where
|
|||
let queue = self.queue.clone();
|
||||
|
||||
self.handles.push(thread::spawn(move || {
|
||||
while let Some(job) = queue.lock().unwrap().pop_front() {
|
||||
tx.send(job()).expect("cannot send result");
|
||||
while let Some(task) = queue.lock().unwrap().pop_front() {
|
||||
// basically try catch
|
||||
if let Err(e) = std::panic::catch_unwind(|| {
|
||||
tx.send((task.job)(task.param))
|
||||
.expect("unable to send result over channel");
|
||||
}) {
|
||||
eprintln!("thread paniced: {:?}", e);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -183,7 +218,7 @@ where
|
|||
/// This function will create a new thread if the maximum number of threads in not reached.
|
||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
||||
/// when a thread is ready and its at the start of the queue.
|
||||
pub fn enqueue(&mut self, func: F) {
|
||||
pub fn enqueue(&mut self, func: Task<I, T>) {
|
||||
self.enqueue_priorize(func, Priority::Low);
|
||||
}
|
||||
|
||||
|
@ -196,15 +231,15 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns all results that have been returned by the threads until now
|
||||
/// Sendables all results that have been Sendableed by the threads until now
|
||||
/// and haven't been consumed yet.
|
||||
/// All results retrieved from this call won't be returned on a second call.
|
||||
/// All results retrieved from this call won't be Sendableed on a second call.
|
||||
/// This function is non blocking.
|
||||
pub fn try_get_results(&mut self) -> Vec<T> {
|
||||
self.receiver.try_iter().collect()
|
||||
}
|
||||
|
||||
/// Returns all results that have been returned by the threads until now
|
||||
/// Sendables all results that have been returned by the threads until now
|
||||
/// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue).
|
||||
/// All results retrieved from this call won't be returned on a second call.
|
||||
/// This function will block the caller thread.
|
||||
|
@ -214,8 +249,20 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<I, T> Drop for ThreadPool<I, T>
|
||||
where
|
||||
I: Sendable,
|
||||
T: Sendable,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
self.join_all();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::panic::UnwindSafe;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
|
@ -223,7 +270,7 @@ mod test {
|
|||
let mut pool = ThreadPool::default();
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue_priorize(move || i, Priority::High);
|
||||
pool.enqueue_priorize(Task::new(i, |i| i), Priority::High);
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
@ -236,7 +283,7 @@ mod test {
|
|||
let mut pool = ThreadPool::with_limit(2);
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue(move || i);
|
||||
pool.enqueue(Task::new(i, |i| i));
|
||||
}
|
||||
|
||||
assert_eq!(pool.handles.len(), 2);
|
||||
|
@ -247,19 +294,62 @@ mod test {
|
|||
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
}
|
||||
|
||||
trait Object: Send + UnwindSafe {
|
||||
fn get(&mut self) -> i32;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Test1 {
|
||||
_int: i32,
|
||||
}
|
||||
|
||||
impl Object for Test1 {
|
||||
fn get(&mut self) -> i32 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Test2 {
|
||||
_c: char,
|
||||
}
|
||||
|
||||
impl Object for Test2 {
|
||||
fn get(&mut self) -> i32 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Test3 {
|
||||
_s: String,
|
||||
}
|
||||
|
||||
impl Object for Test3 {
|
||||
fn get(&mut self) -> i32 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple() {
|
||||
fn test_1() {
|
||||
let mut pool = ThreadPool::with_limit(2);
|
||||
|
||||
for i in 0..10 {
|
||||
pool.enqueue(move || i);
|
||||
}
|
||||
let feats: Vec<Box<dyn Object>> = vec![
|
||||
Box::new(Test1::default()),
|
||||
Box::new(Test2::default()),
|
||||
Box::new(Test3::default()),
|
||||
];
|
||||
|
||||
assert_eq!(pool.handles.len(), 2);
|
||||
assert_eq!(pool.limit.get(), 2);
|
||||
for feat in feats {
|
||||
pool.enqueue(Task::new(feat, |mut i| {
|
||||
let _ = i.get();
|
||||
i
|
||||
}));
|
||||
}
|
||||
|
||||
pool.join_all();
|
||||
|
||||
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
||||
let _feats: Vec<Box<dyn Object>> = pool.get_results();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue