Skip to content

Commit

Permalink
sparse sketching - all but memory allocation?
Browse files Browse the repository at this point in the history
  • Loading branch information
Gillgamesh committed Nov 4, 2024
1 parent 319f30c commit f9e45b8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
17 changes: 13 additions & 4 deletions include/bucket_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ class BucketBuffer {
// std::array<BufferEntry, BUFFER_CAPACITY> thread_local_otherbuffer {};

public:
BucketBuffer(): _capacity(64) {
entries = std::vector<BufferEntry>(_capacity);
BucketBuffer(): _capacity(5000) {
entries = std::vector<BufferEntry>();
entries.reserve(_capacity);
}
~BucketBuffer() {
}
Expand All @@ -119,7 +120,7 @@ class BucketBuffer {
entries.emplace_back(BufferEntry({value, col_idx, row_idx}));
if (over_capacity()) {
sort_and_compact();
return over_capacity();
return !over_capacity();
}
return true;
}
Expand Down Expand Up @@ -160,17 +161,25 @@ class BucketBuffer {
entries.resize(_size);

_compacted = true;
if (size() > 5) {
for (size_t i = 0; i < _size; ++i) {
std::cout << "(" << entries[i].col_idx << ", " << entries[i].row_idx << ") ";
}
std::cout << std::endl;
}
}

bool merge(const BucketBuffer &other) {
// YOU SHOULD ONLY MERGE WITH AN UNDER CAPACITY BUFFER
// TODO - for now, that is the responsibility of the caller.
// std::cout << "Merging buffers of size " << size() << " and " << other.size() << std::endl;
assert(size() + other.size() <= _capacity);
entries.insert(entries.end(), other.entries.begin(), other.entries.end());
// TODO - use a more efficient sorting procedure
if (over_capacity()) {
sort_and_compact();
}
return over_capacity();
return !over_capacity();
}

// merge another buffer into this one
Expand Down
6 changes: 3 additions & 3 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ class Sketch {
* @return The length of the vector to sketch
*/
static vec_t calc_vector_length(node_id_t num_vertices) {
return ceil(double(num_vertices) * (num_vertices - 1) / 2);
// return num_vertices * 4;
// return num_vertices / 2;
// return ceil(double(num_vertices) * (num_vertices - 1) / 2);
return num_vertices * 4;
// return 1 + num_vertices / 16 ;
}

/**
Expand Down
30 changes: 15 additions & 15 deletions src/sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,16 @@ Sketch::~Sketch() {
#else
for (size_t i = 0; i < num_columns; ++i) {
Bucket *old_column = buckets + (i * old_num_rows);
Bucket *new_column = new_buckets + (i * bkt_per_col);
Bucket *new_column = new_buckets + (i * new_num_rows);
std::memcpy(new_column, old_column, old_num_rows * sizeof(Bucket));
}
new_buckets[num_buckets - 1] = buckets[old_buckets_main];
#endif
#ifdef EAGER_BUCKET_CHECK
nonempty_buckets = (vec_t*) (new_buckets + num_buckets);
std::memcpy(nonempty_buckets, buckets + num_buckets, num_columns * sizeof(vec_t));
#else
std::cout << "yeehaw" << std::endl;
#endif
delete[] (char*) buckets;
buckets = new_buckets;
Expand All @@ -187,13 +189,16 @@ Sketch::~Sketch() {
* being stored
*/
bucket_buffer.sort_and_compact();
int i = bucket_buffer.size()-1;
while (i >= 0 && bucket_buffer[i].col_idx < num_columns) {
size_t buffer_size = bucket_buffer.size();
int i = ((int) buffer_size)-1;
while (i >= 0 && bucket_buffer[i].row_idx < bkt_per_col) {
// update the bucket
get_bucket(bucket_buffer[i].col_idx, bucket_buffer[i].row_idx) ^= bucket_buffer[i].value;
i--;
}
bucket_buffer.entries.resize(i+1);
if (buffer_size > 3)
std::cout << "Injected buffer buckets:" << buffer_size << " to " << i+1 << std::endl;
}


Expand Down Expand Up @@ -227,11 +232,6 @@ void Sketch::update(const vec_t update_idx) {
Bucket_Boruvka::get_all_index_depths(
update_idx, depth_buffer, get_seed(), num_columns, bkt_per_col + 1
);
uint32_t max_depth = 0;
for (size_t i = 0; i < num_columns; ++i) {
max_depth = std::max(max_depth, depth_buffer[i]);
// std::cout << "depth " << i << ": " << depth_buffer[i] << std::endl;
}
// Update depth 0 bucket
Bucket_Boruvka::update(get_deterministic_bucket(), update_idx, checksum);

Expand Down Expand Up @@ -313,6 +313,7 @@ SketchSample Sketch::sample() {
if (entry.col_idx >= first_column && entry.col_idx < first_column + cols_per_sample) {
if (Bucket_Boruvka::is_good(entry.value, checksum_seed())) {
std::cout << "Found a bucket in the buffer" << std::endl;
assert(entry.row_idx >= bkt_per_col);
return {entry.value.alpha, GOOD};
}
}
Expand Down Expand Up @@ -359,23 +360,22 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() {
void Sketch::merge(const Sketch &other) {
if (other.bkt_per_col > bkt_per_col) {
reallocate(other.bkt_per_col);
inject_buffer_buckets();
}
Bucket &deterministic_bucket = get_deterministic_bucket();
for (size_t i=0; i < num_columns; ++i) {
size_t other_effective_size = other.effective_size(i);
#pragma omp simd
for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) {
get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha;
get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma;
get_bucket(i, bucket_id) ^= other.get_bucket(i, bucket_id);
}
#ifdef EAGER_BUCKET_CHECK
recalculate_flags(i, 0, other_effective_size);
#endif
}

// seperately update the deterministic bucket
deterministic_bucket.alpha ^= other.get_deterministic_bucket().alpha;
deterministic_bucket.gamma ^= other.get_deterministic_bucket().gamma;
deterministic_bucket ^= other.get_deterministic_bucket();

// merge the deep buffers
// TODO - when sketches have dynamic sizes, this will require more work
Expand Down Expand Up @@ -445,6 +445,7 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp

if (other.bkt_per_col > bkt_per_col) {
reallocate(other.bkt_per_col);
inject_buffer_buckets();
}

// merge deterministic buffer
Expand Down Expand Up @@ -472,15 +473,15 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp
#endif
bool sufficient_space = bucket_buffer.merge(other.bucket_buffer);
if (!sufficient_space) {
std::cout << "Merge: Buffer full, reallocating" << std::endl;
// std::cout << "Merge: Buffer full, reallocating" << std::endl;
reallocate((bkt_per_col * 8) / 5);
inject_buffer_buckets();
}
}

void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) {
// TODO - this function should probably be removed, depracated, etc.
assert(false);
// assert(false);
for (size_t i = 0; i < num_buckets; i++) {
buckets[i].alpha ^= raw_buckets[i].alpha;
buckets[i].gamma ^= raw_buckets[i].gamma;
Expand All @@ -494,7 +495,6 @@ void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) {

uint8_t Sketch::effective_size(size_t col_idx) const
{
return bkt_per_col;
// first, check for emptyness
if (Bucket_Boruvka::is_empty(get_deterministic_bucket()))
{
Expand Down

0 comments on commit f9e45b8

Please sign in to comment.