Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager Empty Bucket Checking #148

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ namespace Bucket_Boruvka {
*/
inline static void update(Bucket& bucket, const vec_t update_idx,
const vec_hash_t update_hash);
inline static bool is_empty(const Bucket &bucket) {
Gillgamesh marked this conversation as resolved.
Show resolved Hide resolved
// return bucket.alpha == 0 && bucket.gamma == 0;
Gillgamesh marked this conversation as resolved.
Show resolved Hide resolved
return (bucket.alpha | bucket.gamma) == 0;
}
} // namespace Bucket_Boruvka

inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const long seed_and_col,
Expand All @@ -63,7 +67,7 @@ inline vec_hash_t Bucket_Boruvka::get_index_hash(const vec_t update_idx, const l
}

inline bool Bucket_Boruvka::is_good(const Bucket &bucket, const long sketch_seed) {
return bucket.gamma == get_index_hash(bucket.alpha, sketch_seed);
return !Bucket_Boruvka::is_empty(bucket) && bucket.gamma == get_index_hash(bucket.alpha, sketch_seed);
}

inline void Bucket_Boruvka::update(Bucket& bucket, const vec_t update_idx,
Expand Down
12 changes: 12 additions & 0 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ class Sketch {

std::mutex mutex; // lock the sketch for applying updates in multithreaded processing


/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line up * and provide a little more documentation. Specify that this operates per column and that all non-empty buckets are above this cutoff.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't think of one

* Gets the 1-index of the deepest non-zero bucket.
* @return The depth of the non-zero'th bucket + 1. If the bucket is entirely empty, returns 0
*/
uint8_t effective_size(size_t col_idx) const;

#ifdef EAGER_BUCKET_CHECK
vec_t *nonempty_buckets;
void update_flags(size_t col_idx, size_t start_row, size_t end_row);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document me pls

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, make all of this private

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recalculate_flags()

#endif

/**
* In-place merge function.
* @param other Sketch to merge into caller
Expand Down
179 changes: 158 additions & 21 deletions src/sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
#include <vector>
#include <cassert>


inline static void set_bit(vec_t &t, int position) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest switching the position of first bucket to most significant bit.
t |= 1 << (sizeof(vec_t) * 8 - position)

t |= 1 << position;
}

inline static void clear_bit(vec_t &t, int position) {
t &= ~(1 << position);
}

Sketch::Sketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) : seed(seed) {
num_samples = _samples;
cols_per_sample = _cols;
Expand All @@ -18,6 +27,14 @@ Sketch::Sketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) :
buckets[i].alpha = 0;
buckets[i].gamma = 0;
}

#ifdef EAGER_BUCKET_CHECK
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to have two forms of serialization:

  1. Direct "no-copy" serialization where we pull the bucket array data out directly.
  2. Compressed serialization which leverages the known column sizes to be more data size efficient.

To pull this off we need the buckets and flags to be stored contiguously. Suggest allocating a few extra buckets that are then used for the flags.

NOTE: We will all have to think about the right way to "receive" these serialized forms on the other side.

nonempty_buckets = new vec_t[num_columns];
for (size_t i = 0; i < num_columns; ++i) {
nonempty_buckets[i] = 0;
}
#endif

}

Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples,
Expand All @@ -28,10 +45,21 @@ Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t
num_columns = num_samples * cols_per_sample;
bkt_per_col = calc_bkt_per_col(vector_len);
num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket
buckets = new Bucket[num_buckets];

buckets = new Bucket[num_buckets] {};
uint8_t sizes[num_columns];
// Read the serialized Sketch contents
binary_in.read((char *)buckets, bucket_array_bytes());
// first, read in the siozes
binary_in.read((char *) sizes, sizeof(uint8_t) * num_columns);
#ifdef EAGER_BUCKET_CHECK
nonempty_buckets = new vec_t[num_columns];
binary_in.read((char *) nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
// grab the deterministic bucket:
binary_in.read((char *) (buckets + num_buckets -1), sizeof(Bucket));
for (size_t col_idx=0; col_idx < num_columns; col_idx++) {
Bucket *current_column = buckets + (col_idx * bkt_per_col);
binary_in.read((char *) current_column, sizeof(Bucket) * sizes[col_idx]);
}
}

Sketch::Sketch(const Sketch &s) : seed(s.seed) {
Expand All @@ -43,9 +71,21 @@ Sketch::Sketch(const Sketch &s) : seed(s.seed) {
buckets = new Bucket[num_buckets];

std::memcpy(buckets, s.buckets, bucket_array_bytes());

#ifdef EAGER_BUCKET_CHECK
Gillgamesh marked this conversation as resolved.
Show resolved Hide resolved
nonempty_buckets = new vec_t[num_columns];
std::memcpy(nonempty_buckets, s.nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
}

Sketch::~Sketch() { delete[] buckets; }
Sketch::~Sketch() {
delete[] buckets;
#ifdef EAGER_BUCKET_CHECK
delete[] nonempty_buckets;
#endif
}



#ifdef L0_SAMPLING
void Sketch::update(const vec_t update_idx) {
Expand Down Expand Up @@ -78,6 +118,13 @@ void Sketch::update(const vec_t update_idx) {
size_t bucket_id = i * bkt_per_col + depth;
likely_if(depth < bkt_per_col) {
Bucket_Boruvka::update(buckets[bucket_id], update_idx, checksum);
#ifdef EAGER_BUCKET_CHECK
likely_if(!Bucket_Boruvka::is_empty(buckets[bucket_id])) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlikely_if(is_empty)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigate the performance of the following:

unlikely_if(is_empty)
  set_bit()
update()
unlikely_if(is_empty)
  clear_bit()

set_bit(nonempty_buckets[i], depth);
} else {
clear_bit(nonempty_buckets[i], depth);
}
#endif
}
}
}
Expand All @@ -99,17 +146,21 @@ SketchSample Sketch::sample() {
size_t idx = sample_idx++;
size_t first_column = idx * cols_per_sample;

if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0)
if (Bucket_Boruvka::is_empty(buckets[num_buckets - 1]))
return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return

if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the is_good() check below the for loop because most of the time we don't need to check it. And if we do need to check it the for loop is inexpensive.

return {buckets[num_buckets - 1].alpha, GOOD};

for (size_t i = 0; i < cols_per_sample; ++i) {
for (size_t j = 0; j < bkt_per_col; ++j) {
size_t bucket_id = (i + first_column) * bkt_per_col + j;
if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed()))
return {buckets[bucket_id].alpha, GOOD};

for (size_t col = first_column; col < first_column + cols_per_sample; ++col) {
int row = effective_size(col)-1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int(effective_size(col))

int window_size = 6;
// now that we've found a non-zero bucket check next if next 4 buckets good
int stop = std::max(row - window_size, 0);
for (; row >= stop; row--) {
if (Bucket_Boruvka::is_good(buckets[col * bkt_per_col + row], checksum_seed()))
return {buckets[col * bkt_per_col + row].alpha, GOOD};
}
}
return {0, FAIL};
Expand All @@ -124,20 +175,22 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() {
size_t idx = sample_idx++;
size_t first_column = idx * cols_per_sample;

unlikely_if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0)
unlikely_if (Bucket_Boruvka::is_empty(buckets[num_buckets -1]))
return {ret, ZERO}; // the "first" bucket is deterministic so if zero then no edges to return

unlikely_if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again move this below the for loop.

ret.insert(buckets[num_buckets - 1].alpha);
return {ret, GOOD};
}

for (size_t i = 0; i < cols_per_sample; ++i) {
for (size_t j = 0; j < bkt_per_col; ++j) {
size_t bucket_id = (i + first_column) * bkt_per_col + j;
unlikely_if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed())) {
ret.insert(buckets[bucket_id].alpha);
}
for (size_t col = first_column; col < first_column + cols_per_sample; ++col) {
int row = effective_size(col)-1;
int window_size = 6;
int stop = std::max(row - window_size, 0);
for (; row >= stop; row--) {
if (Bucket_Boruvka::is_good(buckets[col * bkt_per_col + row], checksum_seed()))
// return {buckets[col * bkt_per_col + row].alpha, GOOD};
ret.insert(buckets[col * bkt_per_col + row].alpha);
}
}

Expand All @@ -146,12 +199,46 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() {
return {ret, GOOD};
}


void Sketch::merge(const Sketch &other) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this function call range_merge()

for (size_t i = 0; i < num_buckets; ++i) {
buckets[i].alpha ^= other.buckets[i].alpha;
buckets[i].gamma ^= other.buckets[i].gamma;
for (size_t i=0; i < num_columns; ++i) {
Bucket *current_col = buckets + (i* bkt_per_col);
Bucket *other_col = other.buckets + (i * bkt_per_col);
size_t other_effective_size = other.effective_size(i);
// size_t other_effective_size = bkt_per_col;
#pragma omp simd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#pragma/#ifdef should not have indent

for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) {
current_col[bucket_id].alpha ^= other_col[bucket_id].alpha;
current_col[bucket_id].gamma ^= other_col[bucket_id].gamma;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance thing to check: Would it be better to perform a bitwise OR of the flags and then after each merge perform an unlikely_if to check if the bucket is now empty. If so, clear the flag.

}
#ifdef EAGER_BUCKET_CHECK
update_flags(i, 0, other_effective_size);
#endif
}

// seperately update the deterministic bucket
buckets[num_buckets-1].alpha ^= other.buckets[num_buckets-1].alpha;
buckets[num_buckets-1].gamma ^= other.buckets[num_buckets-1].gamma;
}


#ifdef EAGER_BUCKET_CHECK
void Sketch::update_flags(size_t col_idx, size_t start_idx, size_t end_idx) {
Bucket *current_col = buckets + (col_idx * bkt_per_col);
assert(end_idx >= start_idx);
vec_t clear_mask = (~0) >> (8*sizeof(vec_t) - (end_idx - start_idx));
clear_mask = ~(clear_mask << start_idx);
vec_t good_buck_status = 0;
#pragma omp simd
for (size_t bucket_id=start_idx; bucket_id < end_idx; bucket_id++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start off with good_buck_status being all 1s and clear it in an unlikely_if - Gil

// good_buck_status |= (!Bucket_Boruvka::is_empty(current_col[bucket_id])) << bucket_id;
likely_if(!Bucket_Boruvka::is_empty(current_col[bucket_id])) set_bit(good_buck_status, bucket_id);
}
nonempty_buckets[col_idx] = (nonempty_buckets[col_idx] & clear_mask) | good_buck_status;
}
#endif



void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samples) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use effective_size()?

if (start_sample + n_samples > num_samples) {
Expand All @@ -176,17 +263,67 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp
buckets[bucket_id].alpha ^= other.buckets[bucket_id].alpha;
buckets[bucket_id].gamma ^= other.buckets[bucket_id].gamma;
}
#ifdef EAGER_BUCKET_CHECK
size_t start_col_id = start_sample * cols_per_sample;
size_t end_col_id = (start_sample + n_samples) * cols_per_sample;
for (size_t i=start_col_id; i < end_col_id; i++ ) {
update_flags(i, 0, other.effective_size(i));
}
#endif
}

void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) {
for (size_t i = 0; i < num_buckets; i++) {
buckets[i].alpha ^= raw_buckets[i].alpha;
buckets[i].gamma ^= raw_buckets[i].gamma;
}
#ifdef EAGER_BUCKET_CHECK
for (size_t col_idx=0; col_idx < num_columns; col_idx++) {
update_flags(col_idx, 0, bkt_per_col);
}
#endif
}

uint8_t Sketch::effective_size(size_t col_idx) const
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our style guide is same line {

// first, check for emptyness
Bucket *current_row = buckets + (col_idx * bkt_per_col);
if (Bucket_Boruvka::is_empty(buckets[num_buckets - 1]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would drop this or move it into the #else. It's a more expensive call than the clzll

{
return 0;
}
#ifdef EAGER_BUCKET_CHECK
unlikely_if(nonempty_buckets[col_idx] == 0) return 0;
return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a convenient way to use ctzll? It's about 3 times faster than clzll

#else
uint8_t idx = bkt_per_col - 1;
while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx]))
{
idx--;
}
unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0;
else return idx + 1;
#endif
}

// void Sketch::serialize(std::ostream &binary_out) const {
// binary_out.write((char*) buckets, bucket_array_bytes());
// }

void Sketch::serialize(std::ostream &binary_out) const {
binary_out.write((char*) buckets, bucket_array_bytes());
uint8_t sizes[num_columns];
for (size_t i=0; i < num_columns; i++) {
sizes[i] = effective_size(i);
}
binary_out.write((char*) sizes, sizeof(uint8_t) * num_columns);
#ifdef EAGER_BUCKET_CHECK
binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
binary_out.write((char *) (buckets + num_buckets-1), sizeof(Bucket));
for (size_t i=0; i < num_columns; i++) {
Bucket *current_column = buckets + (i * bkt_per_col);
binary_out.write((char *) current_column, sizeof(Bucket) * sizes[i]);
}
}

bool operator==(const Sketch &sketch1, const Sketch &sketch2) {
Expand Down
Loading
Loading