diff --git a/benches/multithreading.rs b/benches/multithreading.rs index 6e2eac9..0591bab 100644 --- a/benches/multithreading.rs +++ b/benches/multithreading.rs @@ -1,173 +1,350 @@ -//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs) -//! This benchmark will compare the performance of various thread pools launched with different amounts of -//! maximum threads. -//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit -//! double precision floating point values. +//! This module provides the functionality to create thread pool to execute tasks in parallel. +//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +//! a new thread will be launched until the maximum number is reached. By then every launched thread will +//! be reused to process the remaining elements of the queue. If no jobs are left to be executed +//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +//! # Example +//! ```rust +//! # use imsearch::multithreading::ThreadPool; +//! # use imsearch::multithreading::Task; +//! let mut pool = ThreadPool::with_limit(2); +//! +//! for i in 0..10 { +//! pool.enqueue(Task::new(i, |i| i)); +//! // ^^^^^^ closure or static function +//! } +//! +//! pool.join_all(); +//! assert_eq!(pool.get_results().iter().sum::(), 45); +//! ``` -use std::sync::Arc; +use std::{ + any::Any, + collections::VecDeque, + num::NonZeroUsize, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, +}; -use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use imsearch::multithreading::{Task, ThreadPool}; +/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails. +pub const DEFAULT_THREAD_POOL_SIZE: usize = 1; -/// Amount of elements per vector used to calculate the dot product -const VEC_ELEM_COUNT: usize = 1_000_000; -/// Number of threads to test -const THREAD_COUNTS: [usize; 17] = [ - 1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64, -]; -/// seeds used to scramble up the values produced by the hash function for each vector -/// these are just some pseudo random numbers -const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea]; +/// Indicates the priority level of functions or closures which get supplied to the pool. +/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied +/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum Priority { + /// Indicate that the closure or function supplied to the thread + /// has higher priority than any other given to the pool until now. + /// The item will get enqueued at the start of the waiting-queue. + High, + /// Indicate that the closure or function supplied to the thread pool + /// has lower priority than the already supplied ones in this pool. + /// The item will get enqueued at the end of the waiting-queue. + Low, +} -/// Compute the dot product of two vectors -/// # Panics -/// this function assumes both vectors to be of exactly the same length. -/// If this is not the case the function will panic. -fn dot(a: &[f64], b: &[f64]) -> f64 { - let mut sum = 0.0; +/// Traits a return value has to implement when being given back by a function or closure. +pub trait Sendable: Any + Send + 'static {} - for i in 0..a.len() { - sum += a[i] * b[i]; +impl Sendable for T where T: Any + Send + 'static {} + +/// A task that will be executed at some point in the future by the thread pool +/// At the heart of this struct is the function to be executed. This may be a closure. +#[derive(Debug, Copy, Clone)] +pub struct Task +where + I: Sendable, + T: Sendable, +{ + job: fn(I) -> T, + param: I, +} + +impl Task +where + I: Sendable, + T: Sendable, +{ + pub fn new(param: I, job: fn(I) -> T) -> Self { + Self { job, param } + } +} + +/// Thread pool which can be used to execute functions or closures in parallel. +/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`. +/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking. +/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job +/// a new thread will be launched until the maximum number is reached. By then every launched thread will +/// be reused to process the remaining elements of the queue. If no jobs are left to be executed +/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background. +/// # Example +/// ```rust +/// # use imsearch::multithreading::ThreadPool; +/// # use imsearch::multithreading::Task; +/// let mut pool = ThreadPool::with_limit(2); +/// +/// for i in 0..10 { +/// pool.enqueue(Task::new(i, |i| i)); +/// } +/// +/// pool.join_all(); +/// assert_eq!(pool.get_results().iter().sum::(), 45); +/// ``` +/// # Drop +/// This struct implements the `Drop` trait. Upon being dropped the pool will wait for all threads +/// to finsish. This may take up an arbitrary amount of time. +/// # Panics in the thread +/// When a function or closure panics, the executing thread will detect the unwind performed by `panic` causing the +/// thread to print a message on stderr. The thread itself captures panics and won't terminate execution but continue with +/// the next task in the queue. +/// Its not recommend to use this pool with custom panic hooks or special functions which abort the process. +/// Also panicking code from external program written in C++ or others in undefinied behavior according to [`std::panic::catch_unwind`] +#[derive(Debug)] +pub struct ThreadPool +where + I: Sendable, + T: Sendable, +{ + /// queue for storing the jobs to be executed + queue: Arc>>>, + /// handles for all threads currently running and processing jobs + handles: Vec>, + /// reciver end for channel based communication between threads + receiver: Receiver, + /// sender end for channel based communication between threads + sender: Sender, + /// maximum amount of threads to be used in parallel + limit: NonZeroUsize, +} + +impl Default for ThreadPool +where + I: Sendable, + T: Sendable, +{ + fn default() -> Self { + let (sender, receiver) = channel::(); + + // determine default thread count to use based on the system + let default = + NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero"); + let limit = thread::available_parallelism().unwrap_or(default); + + Self { + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + receiver, + sender, + limit, + } + } +} + +impl ThreadPool +where + I: Sendable, + T: Sendable, +{ + /// Creates a new thread pool with default thread count determined by either + /// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails. + /// No threads will be lauched until jobs are enqueued. + pub fn new() -> Self { + Default::default() } - sum -} - -/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally -/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have -/// finished the partial dot products will be summed to create the final result. -fn dot_parallel(a: Arc>, b: Arc>, threads: usize) { - let mut pool = ThreadPool::with_limit(threads); - - // number of elements in each vector for each thread - let steps = a.len() / threads; - - for i in 0..threads { - // offset of the first element for the thread local vec - let chunk = i * steps; - // launch a new thread - pool.enqueue(Task::new( - (chunk, steps, a.clone(), b.clone()), - |(block, inc, a, b)| { - let a = &a[block..(block + inc)]; - let b = &b[block..(block + inc)]; - dot(a, b) - }, - )); + /// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if + /// the system does not allow for that count of parallelism. + /// No threads will be lauched until jobs are enqueued. + /// # Panic + /// This function will fails if `max_threads` is zero. + pub fn with_limit(max_threads: usize) -> Self { + let (sender, receiver) = channel::(); + Self { + limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"), + queue: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + sender, + receiver, + } } - pool.join_all(); + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job will determine if the job will be put at the start or end of the queue. + /// See [`crate::multithreading::Priority`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue_priorize(&mut self, func: Task, priority: Priority) { + // put job into queue + let mut queue = self.queue.lock().unwrap(); - black_box(pool.get_results().iter().sum::()); -} + // insert new job into queue depending on its priority + match priority { + Priority::High => queue.push_front(func), + Priority::Low => queue.push_back(func), + } -/// Compute a simple hash value for the given index value. -/// This function will return a value between [0, 1]. -#[inline] -fn hash(x: f64) -> f64 { - ((x * 234.8743 + 3.8274).sin() * 87624.58376).fract() -} + if self.handles.len() < self.limit.get() { + // we can still launch threads to run in parallel -/// Create a vector filled with `size` elements of 64-bit floating point numbers -/// each initialized with the function `hash` and the given seed. The vector will -/// be filled with values between [0, 1]. -fn create_vec(size: usize, seed: u64) -> Arc> { - let mut vec = Vec::with_capacity(size); + // clone the sender + let tx = self.sender.clone(); + let queue = self.queue.clone(); - for i in 0..size { - vec.push(hash(i as f64 + seed as f64)); + self.handles.push(thread::spawn(move || { + while let Some(task) = queue.lock().unwrap().pop_front() { + tx.send((task.job)(task.param)) + .expect("unable to send result over channel"); + } + })); + } + + self.handles.retain(|h| !h.is_finished()); } - Arc::new(vec) -} - -/// Function for executing the thread pool benchmarks using criterion.rs. -/// It will create two different vectors and benchmark the single thread performance -/// as well as the multi threadded performance for varying amounts of threads. -pub fn bench_threadpool(c: &mut Criterion) { - let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); - let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - - let mut group = c.benchmark_group("threadpool with various number of threads"); - - for threads in THREAD_COUNTS.iter() { - group.throughput(Throughput::Bytes(*threads as u64)); - group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { - b.iter(|| { - dot_parallel(vec_a.clone(), vec_b.clone(), *threads); - }); - }); - } - group.finish(); -} - -/// Benchmark the effects of over and underusing a thread pools thread capacity. -/// The thread pool will automatically choose the number of threads to use. -/// We will then run a custom number of jobs with that pool that may be smaller or larger -/// than the amount of threads the pool can offer. -fn pool_overusage(a: Arc>, b: Arc>, threads: usize) { - // automatically choose the number of threads - let mut pool = ThreadPool::new(); - - // number of elements in each vector for each thread - let steps = a.len() / threads; - - for i in 0..threads { - // offset of the first element for the thread local vec - let chunk = i * steps; - // launch a new thread - pool.enqueue(Task::new( - (chunk, steps, a.clone(), b.clone()), - |(block, inc, a, b)| { - let a = &a[block..(block + inc)]; - let b = &b[block..(block + inc)]; - dot(a, b) - }, - )); + /// Put a new job into the queue to be executed by a thread in the future. + /// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`]. + /// This function will create a new thread if the maximum number of threads in not reached. + /// In case the maximum number of threads is already used, the job is stalled and will get executed + /// when a thread is ready and its at the start of the queue. + pub fn enqueue(&mut self, func: Task) { + self.enqueue_priorize(func, Priority::Low); } - pool.join_all(); - - black_box(pool.get_results().iter().sum::()); -} - -/// Benchmark the effects of over and underusing a thread pools thread capacity. -/// The thread pool will automatically choose the number of threads to use. -/// We will then run a custom number of jobs with that pool that may be smaller or larger -/// than the amount of threads the pool can offer. -pub fn bench_overusage(c: &mut Criterion) { - let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); - let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - - let mut group = c.benchmark_group("threadpool overusage"); - - for threads in THREAD_COUNTS.iter() { - group.throughput(Throughput::Bytes(*threads as u64)); - group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| { - b.iter(|| { - pool_overusage(vec_a.clone(), vec_b.clone(), *threads); - }); - }); + /// Wait for all threads to finish executing. This means that by the time all threads have finished + /// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty. + /// This function will block the caller thread. + pub fn join_all(&mut self) { + while let Some(handle) = self.handles.pop() { + handle.join().unwrap(); + } + } + + /// Sendables all results that have been Sendableed by the threads until now + /// and haven't been consumed yet. + /// All results retrieved from this call won't be Sendableed on a second call. + /// This function is non blocking. + pub fn try_get_results(&mut self) -> Vec { + self.receiver.try_iter().collect() + } + + /// Sendables all results that have been returned by the threads until now + /// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue). + /// All results retrieved from this call won't be returned on a second call. + /// This function will block the caller thread. + pub fn get_results(&mut self) -> Vec { + self.join_all(); + self.try_get_results() } - group.finish(); } -/// Benchmark the performance of a single thread used to calculate the dot product. -pub fn bench_single_threaded(c: &mut Criterion) { - let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]); - let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]); - - c.bench_function("single threaded", |s| { - s.iter(|| { - black_box(dot(&vec_a, &vec_b)); - }); - }); +impl Drop for ThreadPool +where + I: Sendable, + T: Sendable, +{ + fn drop(&mut self) { + self.join_all(); + } } -criterion_group!( - benches, - bench_single_threaded, - bench_threadpool, - bench_overusage -); -criterion_main!(benches); +#[cfg(test)] +mod test { + use std::panic::UnwindSafe; + + use super::*; + + #[test] + fn test_default() { + let mut pool = ThreadPool::default(); + + for i in 0..10 { + pool.enqueue_priorize(Task::new(i, |i| i), Priority::High); + } + + pool.join_all(); + + assert_eq!(pool.try_get_results().iter().sum::(), 45); + } + + #[test] + fn test_limit() { + let mut pool = ThreadPool::with_limit(2); + + for i in 0..10 { + pool.enqueue(Task::new(i, |i| i)); + } + + assert_eq!(pool.handles.len(), 2); + assert_eq!(pool.limit.get(), 2); + + pool.join_all(); + + assert_eq!(pool.get_results().iter().sum::(), 45); + } + + trait Object: Send + UnwindSafe { + fn get(&mut self) -> i32; + } + + #[derive(Default)] + struct Test1 { + _int: i32, + } + + impl Object for Test1 { + fn get(&mut self) -> i32 { + 0 + } + } + + #[derive(Default)] + struct Test2 { + _c: char, + } + + impl Object for Test2 { + fn get(&mut self) -> i32 { + 0 + } + } + + #[derive(Default)] + struct Test3 { + _s: String, + } + + impl Object for Test3 { + fn get(&mut self) -> i32 { + 0 + } + } + + #[test] + fn test_1() { + let mut pool = ThreadPool::with_limit(2); + + let feats: Vec> = vec![ + Box::new(Test1::default()), + Box::new(Test2::default()), + Box::new(Test3::default()), + ]; + + for feat in feats { + pool.enqueue(Task::new(feat, |mut i| { + let _ = i.get(); + i + })); + } + + pool.join_all(); + + let _feats: Vec> = pool.get_results(); + } +} diff --git a/src/image/mod.rs b/src/image/mod.rs index e38ce21..486cf15 100644 --- a/src/image/mod.rs +++ b/src/image/mod.rs @@ -40,7 +40,7 @@ pub trait Sample: Into + PartialEq + Default + Copy + From + PartialOrd impl + PartialEq + Default + Copy + From + PartialOrd> Sample for T {} #[allow(unused)] -#[derive(Default)] +#[derive(Default, Debug)] pub struct Image where T: Sample, @@ -87,6 +87,10 @@ where pub fn pixel(&self, index: usize) -> (T, T, T, T) { *self.index(index) } + /// Returns all pixel of the image + pub fn pixels(&self, index: usize) -> &Vec<(T, T, T, T)> { + &self.pixels + } /// Returns the path of the image pub fn path(&self) -> &PathBuf { &self.path diff --git a/src/search_index/mod.rs b/src/search_index/mod.rs index 931fb47..b21f4e6 100644 --- a/src/search_index/mod.rs +++ b/src/search_index/mod.rs @@ -1,8 +1,11 @@ use crate::image::Image; +use crate::multithreading::{Task, ThreadPool}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::default::Default; +use std::fs; use std::path::{Path, PathBuf}; +use std::sync::Arc; trait WeightedCmp { fn weighted(&self, other: &Self) -> f32; @@ -10,7 +13,7 @@ trait WeightedCmp { /// Every feature returns a known and sized type #[derive(Debug, Clone, Serialize, Deserialize)] -enum FeatureResult { +pub enum FeatureResult { /// A boolean. Just a boolean Bool(bool), /// Signed 32-bit integer @@ -21,7 +24,7 @@ enum FeatureResult { /// Vector for nested multidimensional Vec(Vec), /// Standard RGBA color - RGBA(f32, f32, f32, f32), + Rgba(f32, f32, f32, f32), /// Indices intended for the usage in histograms Indices(Vec), ///A Character :) @@ -47,7 +50,7 @@ impl PartialEq for FeatureResult { (Self::I32(l0), Self::I32(r0)) => l0 == r0, (Self::F32(l0), Self::F32(r0)) => l0 == r0, (Self::Vec(l0), Self::Vec(r0)) => l0 == r0, - (Self::RGBA(l0, l1, l2, l3), Self::RGBA(r0, r1, r2, r3)) => { + (Self::Rgba(l0, l1, l2, l3), Self::Rgba(r0, r1, r2, r3)) => { l0 == r0 && l1 == r1 && l2 == r2 && l3 == r3 } (Self::Indices(l), Self::Indices(r)) => l == r, @@ -77,41 +80,62 @@ impl WeightedCmp for FeatureResult { } } (Self::F32(l0), Self::F32(r0)) => { - if (l0 - r0).abs() > 0.5 { + if (l0 - r0).abs() < 1e-4 { 1. } else { 0. } } - (Self::Vec(r), Self::Vec(l)) => { - if l == r { - 1. + (Self::Vec(l), Self::Vec(r)) => { + if l.len() == r.len() { + let mut b: f32 = 0.; + for a in l.iter().enumerate() { + b += a.1.weighted(&r[a.0]); + } + b / l.len() as f32 } else { 0. } } - (Self::RGBA(l0, l1, l2, l3), Self::RGBA(r0, r1, r2, r3)) => { - let mut a = 0.; - if l0 == r0 { - a += 0.25; + (Self::Rgba(l0, l1, l2, _), Self::Rgba(r0, r1, r2,_)) => { + let lableft = rgb_to_lab(vec![*l0,*l1,*l2]); + let labright = rgb_to_lab(vec![*r0,*r1,*r2]); + + let mut result = ((lableft[0]-labright[0])*(lableft[0]-labright[0]) + +(lableft[1]-labright[1])*(lableft[1]-labright[1]) + +(lableft[2]-labright[2])*(lableft[2]-labright[2])).sqrt(); //euclidian distance between two colors: Delta E + if result > 100. { + result = 0.; } - if l1 == r1 { - a += 0.25; + else { + result = 1. - result/100.; } - if l2 == r2 { - a += 0.25; - } - if l3 == r3 { - a += 0.25; - } - a + + result } (Self::Indices(l), Self::Indices(r)) => { - l.iter().zip(r.iter()).map(|(a, b)| a * b).sum::() as f32 - / (l.iter().map(|a| a * a).sum::() as f32 - * r.iter().map(|b| b * b).sum::() as f32) - .sqrt() - } //cosines similarity + let mut up = 0_u64; + let mut left = 0_u64; + let mut right = 0_u64; + for (a,b) in l.iter().zip(r.iter()).map(|(a, b)| (a,b)){ + left += a*a; + right += b*b; + up += a*b; + } + + + let mut result = up as f32 / ((left * right) as f32).sqrt();//cosines similarity + + + if result.is_nan() { + if left == right { + result = 1.; + } else { + result = 0. + } + } + result + } (Self::Char(l0), Self::Char(r0)) => { if l0 == r0 { @@ -134,22 +158,122 @@ impl WeightedCmp for FeatureResult { } } -type FeatureGenerator = Box) -> (String, FeatureResult)>; +fn rgb_to_lab(rgb: Vec) -> [f32; 3] { + let r = rgb[0] / 255.0; + let g = rgb[1] / 255.0; + let b = rgb[2] / 255.0; -#[derive(Serialize, Deserialize, Default)] -struct Database { - images: HashMap>, + let r = if r > 0.04045 { ((r + 0.055) / 1.055).powf(2.4) } else { r / 12.92 }; + let g = if g > 0.04045 { ((g + 0.055) / 1.055).powf(2.4) } else { g / 12.92 }; + let b = if b > 0.04045 { ((b + 0.055) / 1.055).powf(2.4) } else { b / 12.92 }; + + let x = r * 0.4124 + g * 0.3576 + b * 0.1805; + let y = r * 0.2126 + g * 0.7152 + b * 0.0722; + let z = r * 0.0193 + g * 0.1192 + b * 0.9505; + + let x = x / 0.95047; + let y = y / 1.0; + let z = z / 1.08883; + + let x = if x > 0.008856 { x.powf(1.0 / 3.0) } else { (7.787 * x) + (16.0 / 116.0) }; + let y = if y > 0.008856 { y.powf(1.0 / 3.0) } else { (7.787 * y) + (16.0 / 116.0) }; + let z = if z > 0.008856 { z.powf(1.0 / 3.0) } else { (7.787 * z) + (16.0 / 116.0) }; + + let l = (116.0 * y) - 16.0; + let a = 500.0 * (x - y); + let b = 200.0 * (y - z); + + [l, a, b] +} + + +pub type FeatureGenerator = fn(Arc>) -> (String, FeatureResult); + +#[derive(Default)] +pub struct Database { + images: IndexedImages, /// keep feature generator for the case when we add a new image /// this field is not serialized and needs to be wrapped in an option - #[serde(skip)] - generators: Option>, + generators: Vec, + threadpool: ThreadPool>, (String, FeatureResult)>, } impl Database { pub fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> { - let image: Image = Image::default(); //todo!("Image reader function") - let search_feat = feature(&image); + self.images.search(imagepath, feature) + } + + ///the new function generates a new Database out of a vector of the Paths of the Images and a Vector of features + pub fn new(imagepaths: &Vec, features: Vec) -> Self { + let mut threadpool = ThreadPool::new(); + Self { + images: IndexedImages::new(imagepaths, &features, &mut threadpool), + generators: features, + threadpool, + } + } + + /// with add_image you can add images in a existing database. + /// databases from a file are read only + pub fn add_image(&mut self, path: &Path) { + if !self.generators.is_empty() { + self.images + .add_image(path, &self.generators, &mut self.threadpool) + } else { + panic!("database without generator functions is immutable") + } + } + + pub fn from_file(path: &Path) -> Self { + let filestring = fs::read_to_string(path).expect("can't read that file"); + let images = serde_json::from_str::(&filestring) + .expect("unable to deserialize the file"); + Self { + images, + generators: Vec::new(), + threadpool: ThreadPool::new(), + } + } +} + +#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] +struct IndexedImages { + images: HashMap>, +} + +impl IndexedImages { + fn new( + imagepaths: &Vec, + features: &[FeatureGenerator], + threadpool: &mut ThreadPool>, (String, FeatureResult)>, + ) -> Self { + let mut images_with_feats = HashMap::new(); + + for path in imagepaths { + let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + let mut feats = HashMap::new(); + + for generator in features.iter() { + threadpool.enqueue(Task::new(image.clone(), *generator)); + } + let vec = threadpool.get_results(); + + for (name, result) in vec { + feats.insert(name, result); + } + + images_with_feats.insert(image.path().clone(), feats); + } + + Self { + images: images_with_feats, + } + } + + fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> { + let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + let search_feat = feature(image); let mut result: Vec<(PathBuf, f32)> = Vec::new(); for image in &self.images { @@ -162,49 +286,152 @@ impl Database { result } - ///the new function generates a new Database out of a vector of the Paths of the Images and a Vector of features - pub fn new(images: &Vec, features: Option>) -> Self { - let mut images_with_feats = HashMap::new(); + fn add_image( + &mut self, + path: &Path, + generator: &Vec, + threadpool: &mut ThreadPool>, (String, FeatureResult)>, + ) { + let image: Arc> = Arc::new(Image::default()); //todo!("Image reader function") + let mut feats = HashMap::new(); - for path in images { - let image: Image = Image::default(); //todo!("Image reader function") - let mut feats = HashMap::new(); - if let Some(gen) = &features { - for generator in gen { - let (name, result) = generator(&image); - feats.insert(name, result); - } - images_with_feats.insert(image.path().clone(), feats); - } + for gen in generator { + threadpool.enqueue(Task::new(image.clone(), *gen)); } - Self { - images: images_with_feats, - generators: features, - } - } + let vec = threadpool.get_results(); - /// with add_image you can add images in a existing database. - /// databases from a file are read only - pub fn add_image(&mut self, path: &Path) { - let image: Image = Image::default(); //todo!("Image reader function") - let mut features = HashMap::new(); - if let Some(gen) = &self.generators { - for generator in gen { - let (name, result) = generator(&image); - features.insert(name, result); - } - self.images.insert(image.path().clone(), features); - } else { - panic!("database without generator functions is immutable") + for (name, result) in vec { + feats.insert(name, result); } + self.images.insert(image.path().clone(), feats); } } - - -#[test] -fn test() { - let data = Database::default(); - - let _as_json = serde_json::to_string(&data); +/// example feature implementation +#[allow(dead_code)] +fn average_luminance(image: Arc>) -> (String, FeatureResult) { + (String::from("average-brightness"), FeatureResult::F32(0.0)) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn conversion() { + + let mut images: HashMap> = HashMap::new(); + let mut feat: HashMap = HashMap::new(); + feat.insert(String::from("average-brightness"), FeatureResult::F32(0.0)); + images.insert(PathBuf::new(), feat); + let data = IndexedImages { images }; + + let _as_json = serde_json::to_string(&data).expect("couldnt convert"); + println!("{:?}", _as_json); + let data_after_conversion = serde_json::from_str::(&_as_json).expect("couldnt convert from string"); + assert_eq!(data, data_after_conversion); + } + + #[test] + fn cosine_similarity(){ + let vec1 = FeatureResult::Indices(vec!{1, 3, 4}); + let vec2 = FeatureResult::Indices(vec!{1, 3, 4}); + + + assert_eq!(1., vec1.weighted(&vec2)); // both are identical + let vec2 = FeatureResult::Indices(vec!{0, 0, 0}); + assert_eq!(0., vec1.weighted(&vec2)); // one is 0 + let vec1 = FeatureResult::Indices(vec!{0, 0, 0}); + assert_eq!(1., vec1.weighted(&vec2)); // both are 0 + + assert_eq!(1., vec2.weighted(&vec1)); // it shouldn't change if the Values are switched + + let vec1 = FeatureResult::Indices(vec!{7, 3, 4}); + let vec2 = FeatureResult::Indices(vec!{1, 5, 2}); + assert_eq!(vec1.weighted(&vec2), vec2.weighted(&vec1)); + println!("{:?}", vec1.weighted(&vec2)); + + + let mut vec1 = vec![5;9999]; + vec1.push( 1); + let vec1 = FeatureResult::Indices(vec1); + let vec2 = FeatureResult::Indices(vec!{7;10000}); + println!("{:?}", vec1.weighted(&vec2)); + + + + } + #[test] + fn weighted() { + + let vec1 = FeatureResult::Vec(vec![FeatureResult::Bool(true), + FeatureResult::Char('c'), + FeatureResult::Vec(vec![FeatureResult::Percent(0.5)]), + FeatureResult::F32(44.543) ]); + + let vec2 = FeatureResult::Vec(vec![FeatureResult::Bool(true), + FeatureResult::Char('c'), + FeatureResult::Vec(vec![FeatureResult::Percent(0.5)]), + FeatureResult::F32(44.543) ]); + assert_eq!(1., vec2.weighted(&vec1)); + + + let vec2 = FeatureResult::Vec(vec![FeatureResult::Bool(true), + FeatureResult::Char('c'), + FeatureResult::F32(44.543) , + FeatureResult::Vec(vec![FeatureResult::Percent(0.5)])]); + assert_eq!(0.5, vec2.weighted(&vec1)); + println!("{:?}", vec1.weighted(&vec2)); + + let value1 = FeatureResult::F32(44.543); + let value2 = FeatureResult::F32(44.543); + assert_eq!(1., value1.weighted(&value2)); + + let value1 = FeatureResult::Bool(true); + let value2 = FeatureResult::Bool(false); + assert_eq!(0., value1.weighted(&value2)); + + let value1 = FeatureResult::String(String::from("Testing")); + let value2 = FeatureResult::String(String::from("notTesting")); + assert_eq!(0., value1.weighted(&value2)); + + + let value2 = FeatureResult::String(String::from("Testing")); + assert_eq!(1., value1.weighted(&value2)) ; + + } + + #[test] + fn weighted_rgba() { + let value1 = FeatureResult::Rgba(32.6754,42.432,43.87,255.); + let value2 = FeatureResult::Rgba(32.6754,42.432,43.87,255.); + assert_eq!(1., value1.weighted(&value2)) ; + + let value1 = FeatureResult::Rgba(255.,255.,0.,255.); + let value2 = FeatureResult::Rgba(0.,0.,0.,255.); + //assert_eq!(1., value1.weighted(&value2)) ; + println!("Yellow to Black: {:?}", value1.weighted(&value2)); + + let value1 = FeatureResult::Rgba(255.,255.,0.,255.); + let value2 = FeatureResult::Rgba(200.,255.,55.,255.); + //assert_eq!(1., value1.weighted(&value2)) ; + println!("yellow to light green: {:?}", value1.weighted(&value2)); + + let value1 = FeatureResult::Rgba(3.,8.,255.,255.); + let value2 = FeatureResult::Rgba(3.,106.,255.,255.); + //assert_eq!(1., value1.weighted(&value2)) ; + println!("blue to dark blue: {:?}", value1.weighted(&value2)); + + let value1 = FeatureResult::Rgba(255.,106.,122.,255.); + let value2 = FeatureResult::Rgba(255.,1.,28.,255.); + //assert_eq!(1., value1.weighted(&value2)) ; + println!("Red to light red: {:?}", value1.weighted(&value2)); + + + } + + + + +} +