Skip to content

Commit

Permalink
compressed serialization with row major flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Gillgamesh committed Sep 5, 2024
1 parent 4b878ed commit 35211d2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 41 deletions.
8 changes: 8 additions & 0 deletions include/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
struct Bucket {
vec_t alpha;
vec_hash_t gamma;
Bucket operator^(const Bucket &rhs) {
return {alpha ^= rhs.alpha,
gamma ^= rhs.gamma};
};
void operator^=(const Bucket &rhs) {
alpha ^= rhs.alpha;
gamma ^= rhs.gamma;
};
};
#pragma pack(pop)

Expand Down
10 changes: 6 additions & 4 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ class Sketch {
* Get the bucket at a specific column and depth
*/
inline Bucket& get_bucket(size_t col_idx, size_t depth) const {
#ifdef ROW_MAJOR_SKETCHES
// contiguous by bucket depth
return buckets[depth * num_columns + col_idx];
#else
// contiguous by column
return buckets[col_idx * bkt_per_col + depth];

// contiguous by bucket depth
// return buckets[depth * num_columns + col_idx];
#endif
}

/**
Expand Down Expand Up @@ -199,7 +201,7 @@ class Sketch {
* Gives the cutoff index such that all non-empty buckets are strictly above for ALL columns
* @return Depth of the deepest non-zero'th bucket + 1. 0 if all buckets are empty.
*/
uint8_t effective_size() const;
uint8_t effective_depth() const;

/**
* In-place merge function.
Expand Down
112 changes: 75 additions & 37 deletions src/sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ void Sketch::compressed_deserialize(std::istream& binary_in) {
buckets[i].gamma = 0;
}

#ifdef ROW_MAJOR_SKETCHES
// first, read in the effective depth
uint8_t max_depth;
binary_in.read((char *) &max_depth, sizeof(uint8_t));
#ifdef EAGER_BUCKET_CHECK
binary_in.read((char *) nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
// grab the deterministic bucket:
binary_in.read((char *) (buckets + num_buckets -1), sizeof(Bucket));
size_t effective_size = max_depth * num_columns;
binary_in.read((char *) buckets, sizeof(Bucket) * effective_size);
#else
uint8_t sizes[num_columns];
// Read the serialized Sketch contents
// first, read in the sizes
Expand All @@ -88,6 +100,7 @@ void Sketch::compressed_deserialize(std::istream& binary_in) {
Bucket *current_column = buckets + (col_idx * bkt_per_col);
binary_in.read((char *) current_column, sizeof(Bucket) * sizes[col_idx]);
}
#endif
}

Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples,
Expand Down Expand Up @@ -255,16 +268,16 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() {
void Sketch::merge(const Sketch &other) {
Bucket &deterministic_bucket = get_deterministic_bucket();
for (size_t i=0; i < num_columns; ++i) {
Bucket *current_col = buckets + (i* bkt_per_col);
Bucket *other_col = other.buckets + (i * bkt_per_col);
// Bucket *current_col = buckets + (i* bkt_per_col);
// Bucket *other_col = other.buckets + (i * bkt_per_col);
size_t other_effective_size = other.effective_size(i);
// size_t other_effective_size = bkt_per_col;
#pragma omp simd
for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) {
current_col[bucket_id].alpha ^= other_col[bucket_id].alpha;
current_col[bucket_id].gamma ^= other_col[bucket_id].gamma;
// get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha;
// get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma;
// current_col[bucket_id].alpha ^= other_col[bucket_id].alpha;
// current_col[bucket_id].gamma ^= other_col[bucket_id].gamma;
get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha;
get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma;
}
#ifdef EAGER_BUCKET_CHECK
recalculate_flags(i, 0, other_effective_size);
Expand Down Expand Up @@ -326,18 +339,31 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp
sample_idx = std::max(sample_idx, start_sample);

// merge deterministic buffer
get_deterministic_bucket().alpha ^= get_deterministic_bucket().alpha;
get_deterministic_bucket().gamma ^= get_deterministic_bucket().gamma;
get_deterministic_bucket() ^= other.get_deterministic_bucket();

size_t start_col_id = start_sample * cols_per_sample;
size_t end_col_id = (start_sample + n_samples) * cols_per_sample;
for (size_t col=start_col_id; col < end_col_id; col++ ) {
#ifdef EAGER_BUCKET_CHECK
size_t effective_size = effective_size(col);
#else
size_t effective_size = bkt_per_col;
#endif
for (size_t row=0; row < effective_size; row++) {
get_bucket(col, row) ^= other.get_bucket(col, row);
}
}
// merge other buckets
size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col;
size_t n_buckets = n_samples * cols_per_sample * bkt_per_col;

for (size_t i = 0; i < n_buckets; i++) {
size_t bucket_id = start_bucket_id + i;
buckets[bucket_id].alpha ^= other.buckets[bucket_id].alpha;
buckets[bucket_id].gamma ^= other.buckets[bucket_id].gamma;
}
// size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col;
// size_t n_buckets = n_samples * cols_per_sample * bkt_per_col;

// for (size_t i = 0; i < n_buckets; i++) {
// size_t bucket_id = start_bucket_id + i;
// buckets[bucket_id] ^= other.buckets[bucket_id];
// }


#ifdef EAGER_BUCKET_CHECK
size_t start_col_id = start_sample * cols_per_sample;
size_t end_col_id = (start_sample + n_samples) * cols_per_sample;
Expand All @@ -361,29 +387,28 @@ void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) {

uint8_t Sketch::effective_size(size_t col_idx) const
{
return bkt_per_col;
// // first, check for emptyness
// if (Bucket_Boruvka::is_empty(get_deterministic_bucket()))
// {
// return 0;
// }
// #ifdef EAGER_BUCKET_CHECK
// unlikely_if(nonempty_buckets[col_idx] == 0) return 0;
// return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx]));
// #else
// uint8_t idx = bkt_per_col - 1;
// // while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx]))
// while (idx != 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx)))
// {
// idx--;
// }
// unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) return 0;
// // unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0;
// else return idx + 1;
// #endif
// first, check for emptyness
if (Bucket_Boruvka::is_empty(get_deterministic_bucket()))
{
return 0;
}
#ifdef EAGER_BUCKET_CHECK
unlikely_if(nonempty_buckets[col_idx] == 0) return 0;
return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx]));
#else
uint8_t idx = bkt_per_col - 1;
// while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx]))
while (idx != 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx)))
{
idx--;
}
unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) return 0;
// unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0;
else return idx + 1;
#endif
}

uint8_t Sketch::effective_size() const
uint8_t Sketch::effective_depth() const
{
unlikely_if(Bucket_Boruvka::is_empty(get_deterministic_bucket())) return 0;
#ifdef EAGER_BUCKET_CHECK
Expand All @@ -403,6 +428,18 @@ uint8_t Sketch::effective_size() const
}

void Sketch::compressed_serialize(std::ostream &binary_out) const {
#ifdef ROW_MAJOR_SKETCHES
// write out max depth, nonempty flags, determinstic bucket, everything else
// then all other buckets
uint8_t max_depth = effective_size();
binary_out.write((char*) &max_depth, sizeof(uint8_t));
size_t number_of_buckets = num_columns * max_depth;
binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket));
#ifdef EAGER_BUCKET_CHECK
binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
binary_out.write((char *) buckets, sizeof(Bucket) * number_of_buckets);
#else
uint8_t sizes[num_columns];
for (size_t i=0; i < num_columns; i++) {
sizes[i] = effective_size(i);
Expand All @@ -411,11 +448,12 @@ void Sketch::compressed_serialize(std::ostream &binary_out) const {
#ifdef EAGER_BUCKET_CHECK
binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns);
#endif
binary_out.write((char *) (buckets + num_buckets-1), sizeof(Bucket));
binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket));
for (size_t i=0; i < num_columns; i++) {
Bucket *current_column = buckets + (i * bkt_per_col);
binary_out.write((char *) current_column, sizeof(Bucket) * sizes[i]);
}
#endif
}

void Sketch::serialize(std::ostream &binary_out) const {
Expand Down

0 comments on commit 35211d2

Please sign in to comment.