fixed multihtreading benchmark missing
removed default code from lib.rs removed println statements from image_loader added image loading feature to database
This commit is contained in:
parent
3bb064870d
commit
b498aee8a7
|
@ -1,350 +1,179 @@
|
||||||
//! This module provides the functionality to create thread pool to execute tasks in parallel.
|
//! Benachmarking funcitonality for [Criterion.rs](https://github.com/bheisler/criterion.rs)
|
||||||
//! The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
//! This benchmark will compare the performance of various thread pools launched with different amounts of
|
||||||
//! This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
//! maximum threads.
|
||||||
//! Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
//! Each thread will calculate a partial dot product of two different vectors composed of 1,000,000 64-bit
|
||||||
//! a new thread will be launched until the maximum number is reached. By then every launched thread will
|
//! double precision floating point values.
|
||||||
//! be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
|
||||||
//! all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
|
||||||
//! # Example
|
|
||||||
//! ```rust
|
|
||||||
//! # use imsearch::multithreading::ThreadPool;
|
|
||||||
//! # use imsearch::multithreading::Task;
|
|
||||||
//! let mut pool = ThreadPool::with_limit(2);
|
|
||||||
//!
|
|
||||||
//! for i in 0..10 {
|
|
||||||
//! pool.enqueue(Task::new(i, |i| i));
|
|
||||||
//! // ^^^^^^ closure or static function
|
|
||||||
//! }
|
|
||||||
//!
|
|
||||||
//! pool.join_all();
|
|
||||||
//! assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
|
||||||
//! ```
|
|
||||||
|
|
||||||
use std::{
|
use std::sync::Arc;
|
||||||
any::Any,
|
|
||||||
collections::VecDeque,
|
|
||||||
num::NonZeroUsize,
|
|
||||||
sync::{
|
|
||||||
mpsc::{channel, Receiver, Sender},
|
|
||||||
Arc, Mutex,
|
|
||||||
},
|
|
||||||
thread::{self, JoinHandle},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Default number if threads to be used in case [`std::thread::available_parallelism`] fails.
|
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||||
pub const DEFAULT_THREAD_POOL_SIZE: usize = 1;
|
use imsearch::multithreading::{Task, ThreadPool};
|
||||||
|
|
||||||
/// Indicates the priority level of functions or closures which get supplied to the pool.
|
/// Amount of elements per vector used to calculate the dot product
|
||||||
/// Use [`Priority::High`] to ensure the closue to be executed before all closures that are already supplied
|
const VEC_ELEM_COUNT: usize = 1_000_000;
|
||||||
/// Use [`Priority::Low`] to ensure the closue to be executed after all closures that are already supplied
|
/// Number of threads to test
|
||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
const THREAD_COUNTS: [usize; 17] = [
|
||||||
pub enum Priority {
|
1, 2, 4, 6, 8, 10, 12, 16, 18, 20, 22, 26, 28, 32, 40, 56, 64,
|
||||||
/// Indicate that the closure or function supplied to the thread
|
|
||||||
/// has higher priority than any other given to the pool until now.
|
|
||||||
/// The item will get enqueued at the start of the waiting-queue.
|
|
||||||
High,
|
|
||||||
/// Indicate that the closure or function supplied to the thread pool
|
|
||||||
/// has lower priority than the already supplied ones in this pool.
|
|
||||||
/// The item will get enqueued at the end of the waiting-queue.
|
|
||||||
Low,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Traits a return value has to implement when being given back by a function or closure.
|
|
||||||
pub trait Sendable: Any + Send + 'static {}
|
|
||||||
|
|
||||||
impl<T> Sendable for T where T: Any + Send + 'static {}
|
|
||||||
|
|
||||||
/// A task that will be executed at some point in the future by the thread pool
|
|
||||||
/// At the heart of this struct is the function to be executed. This may be a closure.
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
|
||||||
pub struct Task<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
job: fn(I) -> T,
|
|
||||||
param: I,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, T> Task<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
pub fn new(param: I, job: fn(I) -> T) -> Self {
|
|
||||||
Self { job, param }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Thread pool which can be used to execute functions or closures in parallel.
|
|
||||||
/// The amount of threads to be used at maximum can be regulated by using `ThreadPool::with_limit`.
|
|
||||||
/// This implementation is aimed to be of low runtime cost with minimal sychronisation due to blocking.
|
|
||||||
/// Note that no threads will be spawned until jobs are supplied to be executed. For every supplied job
|
|
||||||
/// a new thread will be launched until the maximum number is reached. By then every launched thread will
|
|
||||||
/// be reused to process the remaining elements of the queue. If no jobs are left to be executed
|
|
||||||
/// all threads will finish and die. This means that if nothing is done, no threads will run in idle in the background.
|
|
||||||
/// # Example
|
|
||||||
/// ```rust
|
|
||||||
/// # use imsearch::multithreading::ThreadPool;
|
|
||||||
/// # use imsearch::multithreading::Task;
|
|
||||||
/// let mut pool = ThreadPool::with_limit(2);
|
|
||||||
///
|
|
||||||
/// for i in 0..10 {
|
|
||||||
/// pool.enqueue(Task::new(i, |i| i));
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// pool.join_all();
|
|
||||||
/// assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
|
||||||
/// ```
|
|
||||||
/// # Drop
|
|
||||||
/// This struct implements the `Drop` trait. Upon being dropped the pool will wait for all threads
|
|
||||||
/// to finsish. This may take up an arbitrary amount of time.
|
|
||||||
/// # Panics in the thread
|
|
||||||
/// When a function or closure panics, the executing thread will detect the unwind performed by `panic` causing the
|
|
||||||
/// thread to print a message on stderr. The thread itself captures panics and won't terminate execution but continue with
|
|
||||||
/// the next task in the queue.
|
|
||||||
/// Its not recommend to use this pool with custom panic hooks or special functions which abort the process.
|
|
||||||
/// Also panicking code from external program written in C++ or others in undefinied behavior according to [`std::panic::catch_unwind`]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ThreadPool<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
/// queue for storing the jobs to be executed
|
|
||||||
queue: Arc<Mutex<VecDeque<Task<I, T>>>>,
|
|
||||||
/// handles for all threads currently running and processing jobs
|
|
||||||
handles: Vec<JoinHandle<()>>,
|
|
||||||
/// reciver end for channel based communication between threads
|
|
||||||
receiver: Receiver<T>,
|
|
||||||
/// sender end for channel based communication between threads
|
|
||||||
sender: Sender<T>,
|
|
||||||
/// maximum amount of threads to be used in parallel
|
|
||||||
limit: NonZeroUsize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, T> Default for ThreadPool<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
fn default() -> Self {
|
|
||||||
let (sender, receiver) = channel::<T>();
|
|
||||||
|
|
||||||
// determine default thread count to use based on the system
|
|
||||||
let default =
|
|
||||||
NonZeroUsize::new(DEFAULT_THREAD_POOL_SIZE).expect("Thread limit must be non-zero");
|
|
||||||
let limit = thread::available_parallelism().unwrap_or(default);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
queue: Arc::new(Mutex::new(VecDeque::new())),
|
|
||||||
handles: Vec::new(),
|
|
||||||
receiver,
|
|
||||||
sender,
|
|
||||||
limit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, T> ThreadPool<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
/// Creates a new thread pool with default thread count determined by either
|
|
||||||
/// [`std::thread::available_parallelism`] or [`DEFAULT_THREAD_POOL_SIZE`] in case it fails.
|
|
||||||
/// No threads will be lauched until jobs are enqueued.
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Default::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new thread pool with the given thread count. The pool will continue to launch new threads even if
|
|
||||||
/// the system does not allow for that count of parallelism.
|
|
||||||
/// No threads will be lauched until jobs are enqueued.
|
|
||||||
/// # Panic
|
|
||||||
/// This function will fails if `max_threads` is zero.
|
|
||||||
pub fn with_limit(max_threads: usize) -> Self {
|
|
||||||
let (sender, receiver) = channel::<T>();
|
|
||||||
Self {
|
|
||||||
limit: NonZeroUsize::new(max_threads).expect("Thread limit must be non-zero"),
|
|
||||||
queue: Arc::new(Mutex::new(VecDeque::new())),
|
|
||||||
handles: Vec::new(),
|
|
||||||
sender,
|
|
||||||
receiver,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Put a new job into the queue to be executed by a thread in the future.
|
|
||||||
/// The priority of the job will determine if the job will be put at the start or end of the queue.
|
|
||||||
/// See [`crate::multithreading::Priority`].
|
|
||||||
/// This function will create a new thread if the maximum number of threads in not reached.
|
|
||||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
|
||||||
/// when a thread is ready and its at the start of the queue.
|
|
||||||
pub fn enqueue_priorize(&mut self, func: Task<I, T>, priority: Priority) {
|
|
||||||
// put job into queue
|
|
||||||
let mut queue = self.queue.lock().unwrap();
|
|
||||||
|
|
||||||
// insert new job into queue depending on its priority
|
|
||||||
match priority {
|
|
||||||
Priority::High => queue.push_front(func),
|
|
||||||
Priority::Low => queue.push_back(func),
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.handles.len() < self.limit.get() {
|
|
||||||
// we can still launch threads to run in parallel
|
|
||||||
|
|
||||||
// clone the sender
|
|
||||||
let tx = self.sender.clone();
|
|
||||||
let queue = self.queue.clone();
|
|
||||||
|
|
||||||
self.handles.push(thread::spawn(move || {
|
|
||||||
while let Some(task) = queue.lock().unwrap().pop_front() {
|
|
||||||
tx.send((task.job)(task.param))
|
|
||||||
.expect("unable to send result over channel");
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.handles.retain(|h| !h.is_finished());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Put a new job into the queue to be executed by a thread in the future.
|
|
||||||
/// The priority of the job is automatically set to [`crate::multithreading::Priority::Low`].
|
|
||||||
/// This function will create a new thread if the maximum number of threads in not reached.
|
|
||||||
/// In case the maximum number of threads is already used, the job is stalled and will get executed
|
|
||||||
/// when a thread is ready and its at the start of the queue.
|
|
||||||
pub fn enqueue(&mut self, func: Task<I, T>) {
|
|
||||||
self.enqueue_priorize(func, Priority::Low);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wait for all threads to finish executing. This means that by the time all threads have finished
|
|
||||||
/// every task will have been executed too. In other words the threads finsish when the queue of jobs is empty.
|
|
||||||
/// This function will block the caller thread.
|
|
||||||
pub fn join_all(&mut self) {
|
|
||||||
while let Some(handle) = self.handles.pop() {
|
|
||||||
handle.join().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sendables all results that have been Sendableed by the threads until now
|
|
||||||
/// and haven't been consumed yet.
|
|
||||||
/// All results retrieved from this call won't be Sendableed on a second call.
|
|
||||||
/// This function is non blocking.
|
|
||||||
pub fn try_get_results(&mut self) -> Vec<T> {
|
|
||||||
self.receiver.try_iter().collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sendables all results that have been returned by the threads until now
|
|
||||||
/// and haven't been consumed yet. The function will also wait for all threads to finish executing (empty the queue).
|
|
||||||
/// All results retrieved from this call won't be returned on a second call.
|
|
||||||
/// This function will block the caller thread.
|
|
||||||
pub fn get_results(&mut self) -> Vec<T> {
|
|
||||||
self.join_all();
|
|
||||||
self.try_get_results()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, T> Drop for ThreadPool<I, T>
|
|
||||||
where
|
|
||||||
I: Sendable,
|
|
||||||
T: Sendable,
|
|
||||||
{
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.join_all();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use std::panic::UnwindSafe;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_default() {
|
|
||||||
let mut pool = ThreadPool::default();
|
|
||||||
|
|
||||||
for i in 0..10 {
|
|
||||||
pool.enqueue_priorize(Task::new(i, |i| i), Priority::High);
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.join_all();
|
|
||||||
|
|
||||||
assert_eq!(pool.try_get_results().iter().sum::<i32>(), 45);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_limit() {
|
|
||||||
let mut pool = ThreadPool::with_limit(2);
|
|
||||||
|
|
||||||
for i in 0..10 {
|
|
||||||
pool.enqueue(Task::new(i, |i| i));
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(pool.handles.len(), 2);
|
|
||||||
assert_eq!(pool.limit.get(), 2);
|
|
||||||
|
|
||||||
pool.join_all();
|
|
||||||
|
|
||||||
assert_eq!(pool.get_results().iter().sum::<i32>(), 45);
|
|
||||||
}
|
|
||||||
|
|
||||||
trait Object: Send + UnwindSafe {
|
|
||||||
fn get(&mut self) -> i32;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct Test1 {
|
|
||||||
_int: i32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Object for Test1 {
|
|
||||||
fn get(&mut self) -> i32 {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct Test2 {
|
|
||||||
_c: char,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Object for Test2 {
|
|
||||||
fn get(&mut self) -> i32 {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct Test3 {
|
|
||||||
_s: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Object for Test3 {
|
|
||||||
fn get(&mut self) -> i32 {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_1() {
|
|
||||||
let mut pool = ThreadPool::with_limit(2);
|
|
||||||
|
|
||||||
let feats: Vec<Box<dyn Object>> = vec![
|
|
||||||
Box::new(Test1::default()),
|
|
||||||
Box::new(Test2::default()),
|
|
||||||
Box::new(Test3::default()),
|
|
||||||
];
|
];
|
||||||
|
/// seeds used to scramble up the values produced by the hash function for each vector
|
||||||
|
/// these are just some pseudo random numbers
|
||||||
|
const VEC_SEEDS: [u64; 2] = [0xa3f8347abce16, 0xa273048ca9dea];
|
||||||
|
|
||||||
for feat in feats {
|
/// Compute the dot product of two vectors
|
||||||
pool.enqueue(Task::new(feat, |mut i| {
|
/// # Panics
|
||||||
let _ = i.get();
|
/// this function assumes both vectors to be of exactly the same length.
|
||||||
i
|
/// If this is not the case the function will panic.
|
||||||
}));
|
fn dot(a: &[f64], b: &[f64]) -> f64 {
|
||||||
|
let mut sum = 0.0;
|
||||||
|
|
||||||
|
for i in 0..a.len() {
|
||||||
|
sum += a[i] * b[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
sum
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes the dot product using a thread pool with varying number of threads. The vectors will be both splitted into equally
|
||||||
|
/// sized slices which then get passed ot their own thread to compute the partial dot product. After all threads have
|
||||||
|
/// finished the partial dot products will be summed to create the final result.
|
||||||
|
fn dot_parallel(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
|
let mut pool = ThreadPool::with_limit(threads);
|
||||||
|
|
||||||
|
// number of elements in each vector for each thread
|
||||||
|
let steps = a.len() / threads;
|
||||||
|
|
||||||
|
for i in 0..threads {
|
||||||
|
// offset of the first element for the thread local vec
|
||||||
|
let chunk = i * steps;
|
||||||
|
// create a new strong reference to the vector
|
||||||
|
let aa = a.clone();
|
||||||
|
let bb = b.clone();
|
||||||
|
// launch a new thread
|
||||||
|
pool.enqueue(Task::new(
|
||||||
|
(aa, bb, chunk, steps),
|
||||||
|
|(aa, bb, chunk, steps)| {
|
||||||
|
let a = &aa[chunk..(chunk + steps)];
|
||||||
|
let b = &bb[chunk..(chunk + steps)];
|
||||||
|
dot(a, b)
|
||||||
|
},
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.join_all();
|
pool.join_all();
|
||||||
|
|
||||||
let _feats: Vec<Box<dyn Object>> = pool.get_results();
|
black_box(pool.get_results().iter().sum::<f64>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute a simple hash value for the given index value.
|
||||||
|
/// This function will return a value between [0, 1].
|
||||||
|
#[inline]
|
||||||
|
fn hash(x: f64) -> f64 {
|
||||||
|
((x * 234.8743 + 3.8274).sin() * 87624.58376).fract()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a vector filled with `size` elements of 64-bit floating point numbers
|
||||||
|
/// each initialized with the function `hash` and the given seed. The vector will
|
||||||
|
/// be filled with values between [0, 1].
|
||||||
|
fn create_vec(size: usize, seed: u64) -> Arc<Vec<f64>> {
|
||||||
|
let mut vec = Vec::with_capacity(size);
|
||||||
|
|
||||||
|
for i in 0..size {
|
||||||
|
vec.push(hash(i as f64 + seed as f64));
|
||||||
|
}
|
||||||
|
|
||||||
|
Arc::new(vec)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Function for executing the thread pool benchmarks using criterion.rs.
|
||||||
|
/// It will create two different vectors and benchmark the single thread performance
|
||||||
|
/// as well as the multi threadded performance for varying amounts of threads.
|
||||||
|
pub fn bench_threadpool(c: &mut Criterion) {
|
||||||
|
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||||
|
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("threadpool with various number of threads");
|
||||||
|
|
||||||
|
for threads in THREAD_COUNTS.iter() {
|
||||||
|
group.throughput(Throughput::Bytes(*threads as u64));
|
||||||
|
group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
|
||||||
|
b.iter(|| {
|
||||||
|
dot_parallel(vec_a.clone(), vec_b.clone(), *threads);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
||||||
|
/// The thread pool will automatically choose the number of threads to use.
|
||||||
|
/// We will then run a custom number of jobs with that pool that may be smaller or larger
|
||||||
|
/// than the amount of threads the pool can offer.
|
||||||
|
fn pool_overusage(a: Arc<Vec<f64>>, b: Arc<Vec<f64>>, threads: usize) {
|
||||||
|
// automatically choose the number of threads
|
||||||
|
let mut pool = ThreadPool::new();
|
||||||
|
|
||||||
|
// number of elements in each vector for each thread
|
||||||
|
let steps = a.len() / threads;
|
||||||
|
|
||||||
|
for i in 0..threads {
|
||||||
|
// offset of the first element for the thread local vec
|
||||||
|
let chunk = i * steps;
|
||||||
|
// create a new strong reference to the vector
|
||||||
|
let aa = a.clone();
|
||||||
|
let bb = b.clone();
|
||||||
|
// launch a new thread
|
||||||
|
pool.enqueue(Task::new(
|
||||||
|
(aa, bb, chunk, steps),
|
||||||
|
|(aa, bb, chunk, steps)| {
|
||||||
|
let a = &aa[chunk..(chunk + steps)];
|
||||||
|
let b = &bb[chunk..(chunk + steps)];
|
||||||
|
dot(a, b)
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.join_all();
|
||||||
|
|
||||||
|
black_box(pool.get_results().iter().sum::<f64>());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Benchmark the effects of over and underusing a thread pools thread capacity.
|
||||||
|
/// The thread pool will automatically choose the number of threads to use.
|
||||||
|
/// We will then run a custom number of jobs with that pool that may be smaller or larger
|
||||||
|
/// than the amount of threads the pool can offer.
|
||||||
|
pub fn bench_overusage(c: &mut Criterion) {
|
||||||
|
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||||
|
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||||
|
|
||||||
|
let mut group = c.benchmark_group("threadpool overusage");
|
||||||
|
|
||||||
|
for threads in THREAD_COUNTS.iter() {
|
||||||
|
group.throughput(Throughput::Bytes(*threads as u64));
|
||||||
|
group.bench_with_input(BenchmarkId::from_parameter(threads), threads, |b, _| {
|
||||||
|
b.iter(|| {
|
||||||
|
pool_overusage(vec_a.clone(), vec_b.clone(), *threads);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
group.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Benchmark the performance of a single thread used to calculate the dot product.
|
||||||
|
pub fn bench_single_threaded(c: &mut Criterion) {
|
||||||
|
let vec_a = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[0]);
|
||||||
|
let vec_b = create_vec(VEC_ELEM_COUNT, VEC_SEEDS[1]);
|
||||||
|
|
||||||
|
c.bench_function("single threaded", |s| {
|
||||||
|
s.iter(|| {
|
||||||
|
black_box(dot(&vec_a, &vec_b));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(
|
||||||
|
benches,
|
||||||
|
bench_single_threaded,
|
||||||
|
bench_threadpool,
|
||||||
|
bench_overusage
|
||||||
|
);
|
||||||
|
criterion_main!(benches);
|
||||||
|
|
|
@ -88,16 +88,13 @@ where
|
||||||
*self.index(index)
|
*self.index(index)
|
||||||
}
|
}
|
||||||
/// Returns all pixel of the image
|
/// Returns all pixel of the image
|
||||||
pub fn pixels(&self, index: usize) -> &Vec<(T, T, T, T)> {
|
pub fn pixels(&self) -> &Vec<(T, T, T, T)> {
|
||||||
&self.pixels
|
&self.pixels
|
||||||
}
|
}
|
||||||
/// Returns the path of the image
|
/// Returns the path of the image
|
||||||
pub fn path(&self) -> &PathBuf {
|
pub fn path(&self) -> &PathBuf {
|
||||||
&self.path
|
&self.path
|
||||||
}
|
}
|
||||||
pub fn pixels(&self) -> &Vec<(T, T, T, T)> {
|
|
||||||
&self.pixels
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the iterator of the pixels vector
|
/// Returns the iterator of the pixels vector
|
||||||
pub fn iter(&self) -> Iter<'_, (T, T, T, T)> {
|
pub fn iter(&self) -> Iter<'_, (T, T, T, T)> {
|
||||||
|
|
|
@ -41,14 +41,9 @@ pub fn image_loader(path: &Path) -> Result<Image<f32>, &'static str> {
|
||||||
let color_type = reader.info().color_type;
|
let color_type = reader.info().color_type;
|
||||||
let width = reader.info().width;
|
let width = reader.info().width;
|
||||||
let height = reader.info().height;
|
let height = reader.info().height;
|
||||||
let palette = &reader.info().palette;
|
|
||||||
|
|
||||||
let idat = &buf[..info.buffer_size()];
|
let idat = &buf[..info.buffer_size()];
|
||||||
|
|
||||||
println!("idat {:?} idat ", idat);
|
|
||||||
println!("palette {:?} palette", palette);
|
|
||||||
println!("depth {:?} depth", bit_depth);
|
|
||||||
|
|
||||||
let pixel_vec = match color_type {
|
let pixel_vec = match color_type {
|
||||||
png::ColorType::Grayscale => grayscale_to_rgba(idat, bit_depth),
|
png::ColorType::Grayscale => grayscale_to_rgba(idat, bit_depth),
|
||||||
png::ColorType::GrayscaleAlpha => grayscale_alpha_to_rgba(idat, bit_depth),
|
png::ColorType::GrayscaleAlpha => grayscale_alpha_to_rgba(idat, bit_depth),
|
||||||
|
@ -57,12 +52,7 @@ pub fn image_loader(path: &Path) -> Result<Image<f32>, &'static str> {
|
||||||
_ => panic!("Unsupported color type or bit depth"),
|
_ => panic!("Unsupported color type or bit depth"),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
let image: Image<f32> = Image::new(
|
let image: Image<f32> = Image::new(width, height, pixel_vec, path.to_path_buf());
|
||||||
width as usize,
|
|
||||||
height as usize,
|
|
||||||
pixel_vec,
|
|
||||||
path.to_path_buf(),
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(image)
|
Ok(image)
|
||||||
}
|
}
|
||||||
|
|
15
src/lib.rs
15
src/lib.rs
|
@ -4,18 +4,3 @@ pub mod image;
|
||||||
pub mod image_loader;
|
pub mod image_loader;
|
||||||
pub mod multithreading;
|
pub mod multithreading;
|
||||||
pub mod search_index;
|
pub mod search_index;
|
||||||
|
|
||||||
pub fn add(left: usize, right: usize) -> usize {
|
|
||||||
left + right
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn it_works() {
|
|
||||||
let result = add(2, 2);
|
|
||||||
assert_eq!(result, 4);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
//! You also need a Vector of Feature generator functions that generates the feature of every image
|
//! You also need a Vector of Feature generator functions that generates the feature of every image
|
||||||
//!
|
//!
|
||||||
//!
|
//!
|
||||||
//!```
|
//!```rust ignore
|
||||||
//! # use std::path::{Path, PathBuf};
|
//! # use std::path::{Path, PathBuf};
|
||||||
//! # use imsearch::image::Image;
|
//! # use imsearch::image::Image;
|
||||||
//! # use imsearch::search_index;
|
//! # use imsearch::search_index;
|
||||||
|
@ -17,8 +17,8 @@
|
||||||
//! let path: Vec<PathBuf> = Vec::new();
|
//! let path: Vec<PathBuf> = Vec::new();
|
||||||
//! let features: Vec<FeatureGenerator> = Vec::new();
|
//! let features: Vec<FeatureGenerator> = Vec::new();
|
||||||
//!
|
//!
|
||||||
//! let mut database = search_index::Database::new(&path, features );
|
//! let mut database = search_index::Database::new(&path, features).unwrap();
|
||||||
//! database.add_image(Path::new("testpath"));
|
//! database.add_image(Path::new("testpath")).unwrap();
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//!
|
//!
|
||||||
|
@ -266,23 +266,34 @@ impl Database {
|
||||||
///This function search the Database after the Similarity to a given Image in a specific feature.
|
///This function search the Database after the Similarity to a given Image in a specific feature.
|
||||||
/// It returns a Vector of all images and a f32 value which represents the Similarity in percent.
|
/// It returns a Vector of all images and a f32 value which represents the Similarity in percent.
|
||||||
///
|
///
|
||||||
pub fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> {
|
pub fn search(
|
||||||
|
&self,
|
||||||
|
imagepath: &Path,
|
||||||
|
feature: FeatureGenerator,
|
||||||
|
) -> Result<Vec<(PathBuf, f32)>, &'static str> {
|
||||||
self.images.search(imagepath, feature)
|
self.images.search(imagepath, feature)
|
||||||
}
|
}
|
||||||
|
|
||||||
///the new function generates a new Database out of a vector of the Paths of the Images and a Vector of features
|
///the new function generates a new Database out of a vector of the Paths of the Images and a Vector of features
|
||||||
pub fn new(imagepaths: &Vec<PathBuf>, features: Vec<FeatureGenerator>) -> Self {
|
pub fn new(
|
||||||
|
imagepaths: &Vec<PathBuf>,
|
||||||
|
features: Vec<FeatureGenerator>,
|
||||||
|
) -> Result<Self, &'static str> {
|
||||||
let mut threadpool = ThreadPool::new();
|
let mut threadpool = ThreadPool::new();
|
||||||
Self {
|
let images = match IndexedImages::new(imagepaths, &features, &mut threadpool) {
|
||||||
images: IndexedImages::new(imagepaths, &features, &mut threadpool),
|
Ok(images) => images,
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
Ok(Self {
|
||||||
|
images,
|
||||||
generators: features,
|
generators: features,
|
||||||
threadpool,
|
threadpool,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// with add_image you can add images in a existing database.
|
/// with add_image you can add images in a existing database.
|
||||||
/// databases from a file are read only.
|
/// databases from a file are read only.
|
||||||
pub fn add_image(&mut self, path: &Path) {
|
pub fn add_image(&mut self, path: &Path) -> Result<(), &'static str> {
|
||||||
if !self.generators.is_empty() {
|
if !self.generators.is_empty() {
|
||||||
self.images
|
self.images
|
||||||
.add_image(path, &self.generators, &mut self.threadpool)
|
.add_image(path, &self.generators, &mut self.threadpool)
|
||||||
|
@ -317,11 +328,14 @@ impl IndexedImages {
|
||||||
imagepaths: &Vec<PathBuf>,
|
imagepaths: &Vec<PathBuf>,
|
||||||
features: &[FeatureGenerator],
|
features: &[FeatureGenerator],
|
||||||
threadpool: &mut ThreadPool<Arc<Image<f32>>, (String, FeatureResult)>,
|
threadpool: &mut ThreadPool<Arc<Image<f32>>, (String, FeatureResult)>,
|
||||||
) -> Self {
|
) -> Result<Self, &'static str> {
|
||||||
let mut images_with_feats = HashMap::new();
|
let mut images_with_feats = HashMap::new();
|
||||||
|
|
||||||
for path in imagepaths {
|
for path in imagepaths {
|
||||||
let image: Arc<Image<f32>> = Arc::new(Image::default()); //todo!("Image reader function")
|
let image = match crate::image_loader::image_loader(path) {
|
||||||
|
Ok(image) => Arc::new(image),
|
||||||
|
Err(desc) => return Err(desc),
|
||||||
|
};
|
||||||
let mut feats = HashMap::new();
|
let mut feats = HashMap::new();
|
||||||
|
|
||||||
for generator in features.iter() {
|
for generator in features.iter() {
|
||||||
|
@ -336,16 +350,23 @@ impl IndexedImages {
|
||||||
images_with_feats.insert(image.path().clone(), feats);
|
images_with_feats.insert(image.path().clone(), feats);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self {
|
Ok(Self {
|
||||||
images: images_with_feats,
|
images: images_with_feats,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function search the Database after the Similarity to a given Image in a specific feature.
|
///This function search the Database after the Similarity to a given Image in a specific feature.
|
||||||
/// It returns a Vector of all images and a f32 value which represents the Similarity in percent.
|
/// It returns a Vector of all images and a f32 value which represents the Similarity in percent.
|
||||||
///
|
///
|
||||||
fn search(&self, imagepath: &Path, feature: FeatureGenerator) -> Vec<(PathBuf, f32)> {
|
fn search(
|
||||||
let image: Arc<Image<f32>> = Arc::new(Image::default()); //todo!("Image reader function")
|
&self,
|
||||||
|
imagepath: &Path,
|
||||||
|
feature: FeatureGenerator,
|
||||||
|
) -> Result<Vec<(PathBuf, f32)>, &'static str> {
|
||||||
|
let image = match crate::image_loader::image_loader(imagepath) {
|
||||||
|
Ok(image) => Arc::new(image),
|
||||||
|
Err(desc) => return Err(desc),
|
||||||
|
};
|
||||||
let search_feat = feature(image);
|
let search_feat = feature(image);
|
||||||
let mut result: Vec<(PathBuf, f32)> = Vec::new();
|
let mut result: Vec<(PathBuf, f32)> = Vec::new();
|
||||||
|
|
||||||
|
@ -356,7 +377,7 @@ impl IndexedImages {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
///this function lets you add images to the Indexed Image struct
|
///this function lets you add images to the Indexed Image struct
|
||||||
|
@ -365,8 +386,11 @@ impl IndexedImages {
|
||||||
path: &Path,
|
path: &Path,
|
||||||
generator: &Vec<FeatureGenerator>,
|
generator: &Vec<FeatureGenerator>,
|
||||||
threadpool: &mut ThreadPool<Arc<Image<f32>>, (String, FeatureResult)>,
|
threadpool: &mut ThreadPool<Arc<Image<f32>>, (String, FeatureResult)>,
|
||||||
) {
|
) -> Result<(), &'static str> {
|
||||||
let image: Arc<Image<f32>> = Arc::new(Image::default()); //todo!("Image reader function")
|
let image = match crate::image_loader::image_loader(path) {
|
||||||
|
Ok(image) => Arc::new(image),
|
||||||
|
Err(desc) => return Err(desc),
|
||||||
|
};
|
||||||
let mut feats = HashMap::new();
|
let mut feats = HashMap::new();
|
||||||
|
|
||||||
for gen in generator {
|
for gen in generator {
|
||||||
|
@ -378,13 +402,9 @@ impl IndexedImages {
|
||||||
feats.insert(name, result);
|
feats.insert(name, result);
|
||||||
}
|
}
|
||||||
self.images.insert(image.path().clone(), feats);
|
self.images.insert(image.path().clone(), feats);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// example feature implementation
|
Ok(())
|
||||||
#[allow(dead_code)]
|
}
|
||||||
fn average_luminance(image: Arc<Image<f32>>) -> (String, FeatureResult) {
|
|
||||||
(String::from("average-brightness"), FeatureResult::F32(0.0))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in New Issue