added single threaded sparse vector impl
This commit is contained in:
parent
205c973ff6
commit
7174c6e423
|
@ -8,3 +8,6 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
futures = "0.3.28"
|
futures = "0.3.28"
|
||||||
|
jemalloc-ctl = "0.5.0"
|
||||||
|
jemallocator = "0.5.0"
|
||||||
|
bytesize = "1.2.0"
|
|
@ -0,0 +1,2 @@
|
||||||
|
[toolchain]
|
||||||
|
channel = "nightly"
|
|
@ -1,116 +1,98 @@
|
||||||
use std::ops::{Add, Mul};
|
use std::ops::{Add, Mul, Sub};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
use std::time::Instant;
|
||||||
|
use bytesize::ByteSize;
|
||||||
use futures::executor::block_on;
|
use futures::executor::block_on;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use futures::future::{join_all};
|
use futures::future::{join_all};
|
||||||
|
use jemalloc_ctl::{stats, epoch};
|
||||||
|
|
||||||
|
#[global_allocator]
|
||||||
|
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||||
|
|
||||||
/// Only stores more efficiently when at least 50% of all elements are zeros
|
/// Only stores more efficiently when at least 50% of all elements are zeros
|
||||||
pub struct SparseVec {
|
pub struct SparseVec {
|
||||||
column: Vec<(usize, f32)>
|
values: Vec<f64>,
|
||||||
|
indices: Vec<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SparseVec {
|
impl SparseVec {
|
||||||
|
|
||||||
pub fn dot(&self, other: &SparseVec) -> f32 {
|
pub fn dot(&self, other: &SparseVec) -> f64 {
|
||||||
|
let mut sum = 0.0;
|
||||||
|
|
||||||
let future = async move {
|
for index in 0..other.indices.len() {
|
||||||
let divisions = 128;
|
// exponential search for an element in the second vector to have the same index
|
||||||
|
sum += binary_search(self.indices[index], &other.indices, &other.values) * self.values[index];
|
||||||
|
}
|
||||||
|
|
||||||
let k = self.column.len() / divisions;
|
sum
|
||||||
|
|
||||||
let mut futures = Vec::new();
|
|
||||||
|
|
||||||
for i in 0..divisions {
|
|
||||||
let off = i * k;
|
|
||||||
futures.push(dot_threaded(&self.column[off..(off + k)], &other.column[..]));
|
|
||||||
}
|
|
||||||
|
|
||||||
join_all(futures).await
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = block_on(future);
|
|
||||||
|
|
||||||
block_on(async move {
|
|
||||||
let divisions = 16;
|
|
||||||
|
|
||||||
let k = result.len() / divisions;
|
|
||||||
|
|
||||||
let mut futures = Vec::new();
|
|
||||||
|
|
||||||
for i in 0..divisions {
|
|
||||||
let off = i * k;
|
|
||||||
futures.push(sum_async(&result[off..(off + k)]));
|
|
||||||
}
|
|
||||||
|
|
||||||
join_all(futures).await
|
|
||||||
}).iter().fold(0.0, |acc, x| acc + x)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(elements: usize, null_prop: f32) -> Self {
|
pub fn new(elements: usize, non_null: f64) -> Self {
|
||||||
let non_zero_elements = (elements as f32 * (1.0 - null_prop)) as usize;
|
let non_zero_elements = (elements as f64 * non_null) as usize;
|
||||||
|
|
||||||
let mut column = Vec::with_capacity(non_zero_elements);
|
let heap_element_size = std::mem::size_of::<f64>() + std::mem::size_of::<usize>();
|
||||||
|
|
||||||
|
println!("Estimated size on heap: {}", ByteSize::b((non_zero_elements * heap_element_size) as u64));
|
||||||
|
|
||||||
|
println!("allocating...");
|
||||||
|
let mut values = Vec::with_capacity(non_zero_elements);
|
||||||
|
let mut indices = Vec::with_capacity(non_zero_elements);
|
||||||
|
|
||||||
|
println!("generating some data...");
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let mut last_idx = 0;
|
|
||||||
|
|
||||||
for _ in 0..non_zero_elements {
|
for i in 0..non_zero_elements {
|
||||||
last_idx = rng.gen_range(last_idx..elements);
|
values.push(0.5);
|
||||||
column.push((last_idx, rng.gen_range(0.001..1.0)))
|
|
||||||
|
let idx = i as f32 / non_zero_elements as f32 * (elements as f32 - 4.0) + rng.gen_range(0.0..3.0);
|
||||||
|
indices.push(idx as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
column
|
values,
|
||||||
|
indices
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sum_async(arr: &[f32]) -> f32 {
|
fn binary_search(target: usize, indices: &[usize], values: &[f64]) -> f64 {
|
||||||
arr.iter().fold(0.0, |acc, x| acc + x)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn dot_threaded(a: &[(usize, f32)], b: &[(usize, f32)]) -> f32 {
|
let mut range = 0..indices.len();
|
||||||
let mut sum = 0.0;
|
loop {
|
||||||
|
let mut median = (range.end - range.start) >> 1;
|
||||||
for pair in a.iter() {
|
if median == 0 {
|
||||||
|
break;
|
||||||
// exponential search for an element in the second vector to have the same index
|
|
||||||
let mut bound = 1;
|
|
||||||
loop {
|
|
||||||
if bound >= b.len() || b[bound].1 >= pair.1 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
bound *= 2;
|
|
||||||
}
|
}
|
||||||
|
median += range.start;
|
||||||
|
|
||||||
let mut range = 0..bound;
|
if indices[median] == target {
|
||||||
loop {
|
return values[median];
|
||||||
let mut median = (range.end - range.start) / 2;
|
} else if indices[median] > target {
|
||||||
if median == 0 {
|
range.end = median;
|
||||||
break;
|
} else {
|
||||||
}
|
range.start = median;
|
||||||
median += range.start;
|
|
||||||
|
|
||||||
if b[median].0 == pair.0 {
|
|
||||||
sum += b[median].1 * pair.1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if b[median].0 > pair.0 {
|
|
||||||
range.end = median;
|
|
||||||
} else {
|
|
||||||
range.start = median;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sum
|
0.0
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let now = Instant::now();
|
||||||
// generate a sparse vector with 10^10 random elements
|
// generate a sparse vector with 10^10 random elements
|
||||||
let vec = SparseVec::new(10_000_000_000, 0.99);
|
// but only with 2% of them being non-null
|
||||||
|
let vec = SparseVec::new(10_usize.pow(10), 0.02);
|
||||||
|
println!("Created sparse vector took: {}s", Instant::now().sub(now).as_secs_f32());
|
||||||
|
|
||||||
println!("{}", vec.dot(&vec));
|
println!("Sparse vector stack bytes: {} B", std::mem::size_of_val(&vec));
|
||||||
|
|
||||||
|
// many statistics are cached and only updated when the epoch is advanced.
|
||||||
|
epoch::advance().unwrap();
|
||||||
|
println!("Heap allocated bytes (total): {}", ByteSize::b(stats::allocated::read().unwrap() as u64));
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
vec.dot(&vec);
|
||||||
|
println!("Dot product took: {}s", Instant::now().sub(now).as_secs_f32());
|
||||||
}
|
}
|
Loading…
Reference in New Issue