From f9e45b835f57e9aca7b597ab620b4699788e5662 Mon Sep 17 00:00:00 2001 From: Gilvir Gill Date: Mon, 4 Nov 2024 01:40:55 -0500 Subject: [PATCH] sparse sketching - all but memory allocation? --- include/bucket_buffer.h | 17 +++++++++++++---- include/sketch.h | 6 +++--- src/sketch.cpp | 30 +++++++++++++++--------------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/include/bucket_buffer.h b/include/bucket_buffer.h index 73a90b0..359fbac 100644 --- a/include/bucket_buffer.h +++ b/include/bucket_buffer.h @@ -96,8 +96,9 @@ class BucketBuffer { // std::array thread_local_otherbuffer {}; public: - BucketBuffer(): _capacity(64) { - entries = std::vector(_capacity); + BucketBuffer(): _capacity(5000) { + entries = std::vector(); + entries.reserve(_capacity); } ~BucketBuffer() { } @@ -119,7 +120,7 @@ class BucketBuffer { entries.emplace_back(BufferEntry({value, col_idx, row_idx})); if (over_capacity()) { sort_and_compact(); - return over_capacity(); + return !over_capacity(); } return true; } @@ -160,17 +161,25 @@ class BucketBuffer { entries.resize(_size); _compacted = true; + if (size() > 5) { + for (size_t i = 0; i < _size; ++i) { + std::cout << "(" << entries[i].col_idx << ", " << entries[i].row_idx << ") "; + } + std::cout << std::endl; + } } bool merge(const BucketBuffer &other) { // YOU SHOULD ONLY MERGE WITH AN UNDER CAPACITY BUFFER // TODO - for now, that is the responsibility of the caller. + // std::cout << "Merging buffers of size " << size() << " and " << other.size() << std::endl; assert(size() + other.size() <= _capacity); entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + // TODO - use a more efficient sorting procedure if (over_capacity()) { sort_and_compact(); } - return over_capacity(); + return !over_capacity(); } // merge another buffer into this one diff --git a/include/sketch.h b/include/sketch.h index 27bcd2a..eac66fc 100644 --- a/include/sketch.h +++ b/include/sketch.h @@ -90,9 +90,9 @@ class Sketch { * @return The length of the vector to sketch */ static vec_t calc_vector_length(node_id_t num_vertices) { - return ceil(double(num_vertices) * (num_vertices - 1) / 2); - // return num_vertices * 4; - // return num_vertices / 2; + // return ceil(double(num_vertices) * (num_vertices - 1) / 2); + return num_vertices * 4; + // return 1 + num_vertices / 16 ; } /** diff --git a/src/sketch.cpp b/src/sketch.cpp index 41c2cd8..d505e36 100644 --- a/src/sketch.cpp +++ b/src/sketch.cpp @@ -161,7 +161,7 @@ Sketch::~Sketch() { #else for (size_t i = 0; i < num_columns; ++i) { Bucket *old_column = buckets + (i * old_num_rows); - Bucket *new_column = new_buckets + (i * bkt_per_col); + Bucket *new_column = new_buckets + (i * new_num_rows); std::memcpy(new_column, old_column, old_num_rows * sizeof(Bucket)); } new_buckets[num_buckets - 1] = buckets[old_buckets_main]; @@ -169,6 +169,8 @@ Sketch::~Sketch() { #ifdef EAGER_BUCKET_CHECK nonempty_buckets = (vec_t*) (new_buckets + num_buckets); std::memcpy(nonempty_buckets, buckets + num_buckets, num_columns * sizeof(vec_t)); +#else + std::cout << "yeehaw" << std::endl; #endif delete[] (char*) buckets; buckets = new_buckets; @@ -187,13 +189,16 @@ Sketch::~Sketch() { * being stored */ bucket_buffer.sort_and_compact(); - int i = bucket_buffer.size()-1; - while (i >= 0 && bucket_buffer[i].col_idx < num_columns) { + size_t buffer_size = bucket_buffer.size(); + int i = ((int) buffer_size)-1; + while (i >= 0 && bucket_buffer[i].row_idx < bkt_per_col) { // update the bucket get_bucket(bucket_buffer[i].col_idx, bucket_buffer[i].row_idx) ^= bucket_buffer[i].value; i--; } bucket_buffer.entries.resize(i+1); + if (buffer_size > 3) + std::cout << "Injected buffer buckets:" << buffer_size << " to " << i+1 << std::endl; } @@ -227,11 +232,6 @@ void Sketch::update(const vec_t update_idx) { Bucket_Boruvka::get_all_index_depths( update_idx, depth_buffer, get_seed(), num_columns, bkt_per_col + 1 ); - uint32_t max_depth = 0; - for (size_t i = 0; i < num_columns; ++i) { - max_depth = std::max(max_depth, depth_buffer[i]); - // std::cout << "depth " << i << ": " << depth_buffer[i] << std::endl; - } // Update depth 0 bucket Bucket_Boruvka::update(get_deterministic_bucket(), update_idx, checksum); @@ -313,6 +313,7 @@ SketchSample Sketch::sample() { if (entry.col_idx >= first_column && entry.col_idx < first_column + cols_per_sample) { if (Bucket_Boruvka::is_good(entry.value, checksum_seed())) { std::cout << "Found a bucket in the buffer" << std::endl; + assert(entry.row_idx >= bkt_per_col); return {entry.value.alpha, GOOD}; } } @@ -359,14 +360,14 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() { void Sketch::merge(const Sketch &other) { if (other.bkt_per_col > bkt_per_col) { reallocate(other.bkt_per_col); + inject_buffer_buckets(); } Bucket &deterministic_bucket = get_deterministic_bucket(); for (size_t i=0; i < num_columns; ++i) { size_t other_effective_size = other.effective_size(i); #pragma omp simd for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) { - get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha; - get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma; + get_bucket(i, bucket_id) ^= other.get_bucket(i, bucket_id); } #ifdef EAGER_BUCKET_CHECK recalculate_flags(i, 0, other_effective_size); @@ -374,8 +375,7 @@ void Sketch::merge(const Sketch &other) { } // seperately update the deterministic bucket - deterministic_bucket.alpha ^= other.get_deterministic_bucket().alpha; - deterministic_bucket.gamma ^= other.get_deterministic_bucket().gamma; + deterministic_bucket ^= other.get_deterministic_bucket(); // merge the deep buffers // TODO - when sketches have dynamic sizes, this will require more work @@ -445,6 +445,7 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp if (other.bkt_per_col > bkt_per_col) { reallocate(other.bkt_per_col); + inject_buffer_buckets(); } // merge deterministic buffer @@ -472,7 +473,7 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp #endif bool sufficient_space = bucket_buffer.merge(other.bucket_buffer); if (!sufficient_space) { - std::cout << "Merge: Buffer full, reallocating" << std::endl; + // std::cout << "Merge: Buffer full, reallocating" << std::endl; reallocate((bkt_per_col * 8) / 5); inject_buffer_buckets(); } @@ -480,7 +481,7 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { // TODO - this function should probably be removed, depracated, etc. - assert(false); + // assert(false); for (size_t i = 0; i < num_buckets; i++) { buckets[i].alpha ^= raw_buckets[i].alpha; buckets[i].gamma ^= raw_buckets[i].gamma; @@ -494,7 +495,6 @@ void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { uint8_t Sketch::effective_size(size_t col_idx) const { - return bkt_per_col; // first, check for emptyness if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) {