From 7174c6e4238ba0a05119b959798ddfbf7d574bfb Mon Sep 17 00:00:00 2001 From: teridax Date: Sun, 30 Apr 2023 17:00:31 +0200 Subject: [PATCH] added single threaded sparse vector impl --- sparse_vector/Cargo.toml | 5 +- sparse_vector/rust-toolchain.toml | 2 + sparse_vector/src/main.rs | 138 +++++++++++++----------------- 3 files changed, 66 insertions(+), 79 deletions(-) create mode 100644 sparse_vector/rust-toolchain.toml diff --git a/sparse_vector/Cargo.toml b/sparse_vector/Cargo.toml index 4803dee..ad670e4 100644 --- a/sparse_vector/Cargo.toml +++ b/sparse_vector/Cargo.toml @@ -7,4 +7,7 @@ edition = "2021" [dependencies] rand = "0.8.5" -futures = "0.3.28" \ No newline at end of file +futures = "0.3.28" +jemalloc-ctl = "0.5.0" +jemallocator = "0.5.0" +bytesize = "1.2.0" \ No newline at end of file diff --git a/sparse_vector/rust-toolchain.toml b/sparse_vector/rust-toolchain.toml new file mode 100644 index 0000000..271800c --- /dev/null +++ b/sparse_vector/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" \ No newline at end of file diff --git a/sparse_vector/src/main.rs b/sparse_vector/src/main.rs index cc51f0c..0e214ff 100644 --- a/sparse_vector/src/main.rs +++ b/sparse_vector/src/main.rs @@ -1,116 +1,98 @@ -use std::ops::{Add, Mul}; +use std::ops::{Add, Mul, Sub}; use std::thread; +use std::time::Instant; +use bytesize::ByteSize; use futures::executor::block_on; use rand::Rng; use futures::future::{join_all}; +use jemalloc_ctl::{stats, epoch}; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; /// Only stores more efficiently when at least 50% of all elements are zeros pub struct SparseVec { - column: Vec<(usize, f32)> + values: Vec, + indices: Vec, } impl SparseVec { - pub fn dot(&self, other: &SparseVec) -> f32 { + pub fn dot(&self, other: &SparseVec) -> f64 { + let mut sum = 0.0; - let future = async move { - let divisions = 128; + for index in 0..other.indices.len() { + // exponential search for an element in the second vector to have the same index + sum += binary_search(self.indices[index], &other.indices, &other.values) * self.values[index]; + } - let k = self.column.len() / divisions; - - let mut futures = Vec::new(); - - for i in 0..divisions { - let off = i * k; - futures.push(dot_threaded(&self.column[off..(off + k)], &other.column[..])); - } - - join_all(futures).await - }; - - let result = block_on(future); - - block_on(async move { - let divisions = 16; - - let k = result.len() / divisions; - - let mut futures = Vec::new(); - - for i in 0..divisions { - let off = i * k; - futures.push(sum_async(&result[off..(off + k)])); - } - - join_all(futures).await - }).iter().fold(0.0, |acc, x| acc + x) + sum } - pub fn new(elements: usize, null_prop: f32) -> Self { - let non_zero_elements = (elements as f32 * (1.0 - null_prop)) as usize; + pub fn new(elements: usize, non_null: f64) -> Self { + let non_zero_elements = (elements as f64 * non_null) as usize; - let mut column = Vec::with_capacity(non_zero_elements); + let heap_element_size = std::mem::size_of::() + std::mem::size_of::(); + println!("Estimated size on heap: {}", ByteSize::b((non_zero_elements * heap_element_size) as u64)); + + println!("allocating..."); + let mut values = Vec::with_capacity(non_zero_elements); + let mut indices = Vec::with_capacity(non_zero_elements); + + println!("generating some data..."); let mut rng = rand::thread_rng(); - let mut last_idx = 0; - for _ in 0..non_zero_elements { - last_idx = rng.gen_range(last_idx..elements); - column.push((last_idx, rng.gen_range(0.001..1.0))) + for i in 0..non_zero_elements { + values.push(0.5); + + let idx = i as f32 / non_zero_elements as f32 * (elements as f32 - 4.0) + rng.gen_range(0.0..3.0); + indices.push(idx as usize); } Self { - column + values, + indices } } } -async fn sum_async(arr: &[f32]) -> f32 { - arr.iter().fold(0.0, |acc, x| acc + x) -} +fn binary_search(target: usize, indices: &[usize], values: &[f64]) -> f64 { -async fn dot_threaded(a: &[(usize, f32)], b: &[(usize, f32)]) -> f32 { - let mut sum = 0.0; - - for pair in a.iter() { - - // exponential search for an element in the second vector to have the same index - let mut bound = 1; - loop { - if bound >= b.len() || b[bound].1 >= pair.1 { - break; - } - - bound *= 2; + let mut range = 0..indices.len(); + loop { + let mut median = (range.end - range.start) >> 1; + if median == 0 { + break; } + median += range.start; - let mut range = 0..bound; - loop { - let mut median = (range.end - range.start) / 2; - if median == 0 { - break; - } - median += range.start; - - if b[median].0 == pair.0 { - sum += b[median].1 * pair.1; - break; - } - - if b[median].0 > pair.0 { - range.end = median; - } else { - range.start = median; - } + if indices[median] == target { + return values[median]; + } else if indices[median] > target { + range.end = median; + } else { + range.start = median; } } - sum + 0.0 } fn main() { + let now = Instant::now(); // generate a sparse vector with 10^10 random elements - let vec = SparseVec::new(10_000_000_000, 0.99); + // but only with 2% of them being non-null + let vec = SparseVec::new(10_usize.pow(10), 0.02); + println!("Created sparse vector took: {}s", Instant::now().sub(now).as_secs_f32()); - println!("{}", vec.dot(&vec)); + println!("Sparse vector stack bytes: {} B", std::mem::size_of_val(&vec)); + + // many statistics are cached and only updated when the epoch is advanced. + epoch::advance().unwrap(); + println!("Heap allocated bytes (total): {}", ByteSize::b(stats::allocated::read().unwrap() as u64)); + + let now = Instant::now(); + vec.dot(&vec); + println!("Dot product took: {}s", Instant::now().sub(now).as_secs_f32()); } \ No newline at end of file