From 35211d2908829ef90545972f3ff4d556e006ec61 Mon Sep 17 00:00:00 2001 From: Gilvir Gill Date: Thu, 5 Sep 2024 16:51:14 -0400 Subject: [PATCH] compressed serialization with row major flag --- include/bucket.h | 8 ++++ include/sketch.h | 10 +++-- src/sketch.cpp | 112 +++++++++++++++++++++++++++++++---------------- 3 files changed, 89 insertions(+), 41 deletions(-) diff --git a/include/bucket.h b/include/bucket.h index 513271a..6d5f597 100644 --- a/include/bucket.h +++ b/include/bucket.h @@ -8,6 +8,14 @@ struct Bucket { vec_t alpha; vec_hash_t gamma; + Bucket operator^(const Bucket &rhs) { + return {alpha ^= rhs.alpha, + gamma ^= rhs.gamma}; + }; + void operator^=(const Bucket &rhs) { + alpha ^= rhs.alpha; + gamma ^= rhs.gamma; + }; }; #pragma pack(pop) diff --git a/include/sketch.h b/include/sketch.h index 8563d50..4325f0c 100644 --- a/include/sketch.h +++ b/include/sketch.h @@ -141,11 +141,13 @@ class Sketch { * Get the bucket at a specific column and depth */ inline Bucket& get_bucket(size_t col_idx, size_t depth) const { +#ifdef ROW_MAJOR_SKETCHES + // contiguous by bucket depth + return buckets[depth * num_columns + col_idx]; +#else // contiguous by column return buckets[col_idx * bkt_per_col + depth]; - - // contiguous by bucket depth - // return buckets[depth * num_columns + col_idx]; +#endif } /** @@ -199,7 +201,7 @@ class Sketch { * Gives the cutoff index such that all non-empty buckets are strictly above for ALL columns * @return Depth of the deepest non-zero'th bucket + 1. 0 if all buckets are empty. */ - uint8_t effective_size() const; + uint8_t effective_depth() const; /** * In-place merge function. diff --git a/src/sketch.cpp b/src/sketch.cpp index 1b86e56..6f92af5 100644 --- a/src/sketch.cpp +++ b/src/sketch.cpp @@ -75,6 +75,18 @@ void Sketch::compressed_deserialize(std::istream& binary_in) { buckets[i].gamma = 0; } +#ifdef ROW_MAJOR_SKETCHES + // first, read in the effective depth + uint8_t max_depth; + binary_in.read((char *) &max_depth, sizeof(uint8_t)); + #ifdef EAGER_BUCKET_CHECK + binary_in.read((char *) nonempty_buckets, sizeof(vec_t) * num_columns); + #endif + // grab the deterministic bucket: + binary_in.read((char *) (buckets + num_buckets -1), sizeof(Bucket)); + size_t effective_size = max_depth * num_columns; + binary_in.read((char *) buckets, sizeof(Bucket) * effective_size); +#else uint8_t sizes[num_columns]; // Read the serialized Sketch contents // first, read in the sizes @@ -88,6 +100,7 @@ void Sketch::compressed_deserialize(std::istream& binary_in) { Bucket *current_column = buckets + (col_idx * bkt_per_col); binary_in.read((char *) current_column, sizeof(Bucket) * sizes[col_idx]); } +#endif } Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples, @@ -255,16 +268,16 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() { void Sketch::merge(const Sketch &other) { Bucket &deterministic_bucket = get_deterministic_bucket(); for (size_t i=0; i < num_columns; ++i) { - Bucket *current_col = buckets + (i* bkt_per_col); - Bucket *other_col = other.buckets + (i * bkt_per_col); + // Bucket *current_col = buckets + (i* bkt_per_col); + // Bucket *other_col = other.buckets + (i * bkt_per_col); size_t other_effective_size = other.effective_size(i); // size_t other_effective_size = bkt_per_col; #pragma omp simd for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) { - current_col[bucket_id].alpha ^= other_col[bucket_id].alpha; - current_col[bucket_id].gamma ^= other_col[bucket_id].gamma; - // get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha; - // get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma; + // current_col[bucket_id].alpha ^= other_col[bucket_id].alpha; + // current_col[bucket_id].gamma ^= other_col[bucket_id].gamma; + get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha; + get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma; } #ifdef EAGER_BUCKET_CHECK recalculate_flags(i, 0, other_effective_size); @@ -326,18 +339,31 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp sample_idx = std::max(sample_idx, start_sample); // merge deterministic buffer - get_deterministic_bucket().alpha ^= get_deterministic_bucket().alpha; - get_deterministic_bucket().gamma ^= get_deterministic_bucket().gamma; + get_deterministic_bucket() ^= other.get_deterministic_bucket(); + size_t start_col_id = start_sample * cols_per_sample; + size_t end_col_id = (start_sample + n_samples) * cols_per_sample; + for (size_t col=start_col_id; col < end_col_id; col++ ) { +#ifdef EAGER_BUCKET_CHECK + size_t effective_size = effective_size(col); +#else + size_t effective_size = bkt_per_col; +#endif + for (size_t row=0; row < effective_size; row++) { + get_bucket(col, row) ^= other.get_bucket(col, row); + } + } // merge other buckets - size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col; - size_t n_buckets = n_samples * cols_per_sample * bkt_per_col; - for (size_t i = 0; i < n_buckets; i++) { - size_t bucket_id = start_bucket_id + i; - buckets[bucket_id].alpha ^= other.buckets[bucket_id].alpha; - buckets[bucket_id].gamma ^= other.buckets[bucket_id].gamma; - } + // size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col; + // size_t n_buckets = n_samples * cols_per_sample * bkt_per_col; + + // for (size_t i = 0; i < n_buckets; i++) { + // size_t bucket_id = start_bucket_id + i; + // buckets[bucket_id] ^= other.buckets[bucket_id]; + // } + + #ifdef EAGER_BUCKET_CHECK size_t start_col_id = start_sample * cols_per_sample; size_t end_col_id = (start_sample + n_samples) * cols_per_sample; @@ -361,29 +387,28 @@ void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { uint8_t Sketch::effective_size(size_t col_idx) const { - return bkt_per_col; -// // first, check for emptyness -// if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) -// { -// return 0; -// } -// #ifdef EAGER_BUCKET_CHECK -// unlikely_if(nonempty_buckets[col_idx] == 0) return 0; -// return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx])); -// #else -// uint8_t idx = bkt_per_col - 1; -// // while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx])) -// while (idx != 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) -// { -// idx--; -// } -// unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) return 0; -// // unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0; -// else return idx + 1; -// #endif + // first, check for emptyness + if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) + { + return 0; + } +#ifdef EAGER_BUCKET_CHECK + unlikely_if(nonempty_buckets[col_idx] == 0) return 0; + return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx])); +#else + uint8_t idx = bkt_per_col - 1; + // while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx])) + while (idx != 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) + { + idx--; + } + unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) return 0; + // unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0; + else return idx + 1; +#endif } -uint8_t Sketch::effective_size() const +uint8_t Sketch::effective_depth() const { unlikely_if(Bucket_Boruvka::is_empty(get_deterministic_bucket())) return 0; #ifdef EAGER_BUCKET_CHECK @@ -403,6 +428,18 @@ uint8_t Sketch::effective_size() const } void Sketch::compressed_serialize(std::ostream &binary_out) const { +#ifdef ROW_MAJOR_SKETCHES + // write out max depth, nonempty flags, determinstic bucket, everything else + // then all other buckets + uint8_t max_depth = effective_size(); + binary_out.write((char*) &max_depth, sizeof(uint8_t)); + size_t number_of_buckets = num_columns * max_depth; + binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket)); + #ifdef EAGER_BUCKET_CHECK + binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns); + #endif + binary_out.write((char *) buckets, sizeof(Bucket) * number_of_buckets); +#else uint8_t sizes[num_columns]; for (size_t i=0; i < num_columns; i++) { sizes[i] = effective_size(i); @@ -411,11 +448,12 @@ void Sketch::compressed_serialize(std::ostream &binary_out) const { #ifdef EAGER_BUCKET_CHECK binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns); #endif - binary_out.write((char *) (buckets + num_buckets-1), sizeof(Bucket)); + binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket)); for (size_t i=0; i < num_columns; i++) { Bucket *current_column = buckets + (i * bkt_per_col); binary_out.write((char *) current_column, sizeof(Bucket) * sizes[i]); } +#endif } void Sketch::serialize(std::ostream &binary_out) const {