diff --git a/CMakeLists.txt b/CMakeLists.txt index 95e90896..cae81135 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,6 +9,8 @@ set(CMAKE_CXX_EXTENSIONS ON) set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib) set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE) + + # Make the default build type Release. If user or another # project sets a different value than use that if(NOT CMAKE_BUILD_TYPE) @@ -28,6 +30,12 @@ else() message(STATUS "${CMAKE_CXX_COMPILER_ID} not recognized, no flags added") endif() +include(CheckCXXCompilerFlag) +CHECK_CXX_COMPILER_FLAG("-march=native" COMPILER_SUPPORTS_MARCH_NATIVE) +if(COMPILER_SUPPORTS_MARCH_NATIVE) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native") +endif() + #add_compile_options(-fsanitize=address) #add_link_options(-fsanitize=address) #add_compile_options(-fsanitize=undefined) diff --git a/include/bucket.h b/include/bucket.h index 5d6a6af6..6d5f5979 100644 --- a/include/bucket.h +++ b/include/bucket.h @@ -8,11 +8,26 @@ struct Bucket { vec_t alpha; vec_hash_t gamma; + Bucket operator^(const Bucket &rhs) { + return {alpha ^= rhs.alpha, + gamma ^= rhs.gamma}; + }; + void operator^=(const Bucket &rhs) { + alpha ^= rhs.alpha; + gamma ^= rhs.gamma; + }; }; #pragma pack(pop) namespace Bucket_Boruvka { static constexpr size_t col_hash_bits = sizeof(col_hash_t) * 8; + + /** + * Returns whether or not a bucket is empty. + * @param bucket Bucket to check for empty. + * @return With high probability, return whether or not a given bucket is empty. + */ + inline static bool is_empty(const Bucket &bucket); /** * Hashes the column index and the update index together to determine the depth of an update * This is used as a parameter to Bucket::contains. @@ -33,6 +48,7 @@ namespace Bucket_Boruvka { */ inline static vec_hash_t get_index_hash(const vec_t index, const long sketch_seed); + /** * Checks whether a Bucket is good, assuming the Bucket contains all elements. * @param bucket The bucket to check @@ -51,19 +67,24 @@ namespace Bucket_Boruvka { const vec_hash_t update_hash); } // namespace Bucket_Boruvka +inline bool Bucket_Boruvka::is_empty(const Bucket &bucket) { + return (bucket.alpha | bucket.gamma) == 0; +} + inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const long seed_and_col, const vec_hash_t max_depth) { - col_hash_t depth_hash = col_hash(&update_idx, sizeof(vec_t), seed_and_col); + col_hash_t depth_hash = XXH3_128bits_withSeed(&update_idx, sizeof(vec_t), seed_and_col).high64; depth_hash |= (1ull << max_depth); // assert not > max_depth by ORing return __builtin_ctzll(depth_hash); } inline vec_hash_t Bucket_Boruvka::get_index_hash(const vec_t update_idx, const long sketch_seed) { - return vec_hash(&update_idx, sizeof(vec_t), sketch_seed); + return (XXH3_128bits_withSeed (&update_idx, sizeof(vec_t), sketch_seed)).low64; } + inline bool Bucket_Boruvka::is_good(const Bucket &bucket, const long sketch_seed) { - return bucket.gamma == get_index_hash(bucket.alpha, sketch_seed); + return !Bucket_Boruvka::is_empty(bucket) && bucket.gamma == get_index_hash(bucket.alpha, sketch_seed); } inline void Bucket_Boruvka::update(Bucket& bucket, const vec_t update_idx, diff --git a/include/cc_sketch_alg.h b/include/cc_sketch_alg.h index 9e9d3f8c..669bf911 100644 --- a/include/cc_sketch_alg.h +++ b/include/cc_sketch_alg.h @@ -67,13 +67,16 @@ enum QueryCode { * (no self-edges or multi-edges) */ class CCSketchAlg { + public: + Sketch **sketches; + private: node_id_t num_vertices; size_t seed; bool update_locked = false; // a set containing one "representative" from each supernode std::set *representatives; - Sketch **sketches; + // Sketch **sketches; // DSU representation of supernode relationship DisjointSetUnion_MT dsu; diff --git a/include/sketch.h b/include/sketch.h index 80473ecc..4325f0cd 100644 --- a/include/sketch.h +++ b/include/sketch.h @@ -38,12 +38,15 @@ struct ExhaustiveSketchSample { * Sub-linear representation of a vector. */ class Sketch { + public: + size_t num_columns; // Total number of columns. (product of above 2) + size_t bkt_per_col; // number of buckets per column private: const uint64_t seed; // seed for hash functions size_t num_samples; // number of samples we can perform size_t cols_per_sample; // number of columns to use on each sample - size_t num_columns; // Total number of columns. (product of above 2) - size_t bkt_per_col; // number of buckets per column + // size_t num_columns; // Total number of columns. (product of above 2) + // size_t bkt_per_col; // number of buckets per column size_t num_buckets; // number of total buckets (product of above 2) size_t sample_idx = 0; // number of samples performed so far @@ -51,6 +54,23 @@ class Sketch { // bucket data Bucket* buckets; + // flags + +#ifdef EAGER_BUCKET_CHECK + vec_t *nonempty_buckets; + /** + * Updates the nonempty flags in a given range by recalculating the is_empty() call. + * @param col_idx The column to update + * @param start_row The depth of the first bucket in the column to check the emptyness of. + * @param end_row The depth of the first bucket not to check the emptyness (i.e., an exclusive bound) + */ + void recalculate_flags(size_t col_idx, size_t start_row, size_t end_row); +#endif + private: + inline Bucket& get_deterministic_bucket() const { + return buckets[num_buckets - 1]; + } + public: /** * The below constructors use vector length as their input. However, in graph sketching our input @@ -85,6 +105,19 @@ class Sketch { Sketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1, size_t cols_per_sample = default_cols_per_sample); + + /** + * Construct a sketch from a (potentially compressed) serialized stream + * @param vector_len Length of the vector we are sketching + * @param seed Random seed of the sketch + * @param binary_in Stream holding serialized sketch object + * @param num_samples [Optional] Number of samples this sketch supports (default = 1) + * @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1) + * @param compressed Whether or not to use the compression (default = true) + */ + Sketch(vec_t vector_len, uint64_t seed, bool compressed, std::istream& binary_in, size_t num_samples = 1, + size_t cols_per_sample = default_cols_per_sample); + /** * Construct a sketch from a serialized stream * @param vector_len Length of the vector we are sketching @@ -104,12 +137,42 @@ class Sketch { ~Sketch(); + /** + * Get the bucket at a specific column and depth + */ + inline Bucket& get_bucket(size_t col_idx, size_t depth) const { +#ifdef ROW_MAJOR_SKETCHES + // contiguous by bucket depth + return buckets[depth * num_columns + col_idx]; +#else + // contiguous by column + return buckets[col_idx * bkt_per_col + depth]; +#endif + } + + /** + * Occupies the contents of an empty sketch with input from a stream that contains + * the compressed version. + * @param binary_in Stream holding serialized/compressed sketch object. + */ + void compressed_deserialize(std::istream& binary_in); + + /** * Update a sketch based on information about one of its indices. * @param update the point update. */ void update(const vec_t update); + +#ifdef EAGER_BUCKET_CHECK + /** + * TODO - make this less silly + */ + + void unsafe_update(); +#endif + /** * Function to sample from the sketch. * cols_per_sample determines the number of columns we allocate to this query @@ -125,6 +188,21 @@ class Sketch { std::mutex mutex; // lock the sketch for applying updates in multithreaded processing + + /** + * Gives the cutoff index such that all non-empty buckets are strictly above. + * @param col_idx The column to find the cutoff index of. + * @return The depth of the non-zero'th bucket + 1. If the bucket is entirely empty, returns 0 + */ + uint8_t effective_size(size_t col_idx) const; + + + /** + * Gives the cutoff index such that all non-empty buckets are strictly above for ALL columns + * @return Depth of the deepest non-zero'th bucket + 1. 0 if all buckets are empty. + */ + uint8_t effective_depth() const; + /** * In-place merge function. * @param other Sketch to merge into caller @@ -163,12 +241,25 @@ class Sketch { */ void serialize(std::ostream& binary_out) const; + /** + * Serialize the sketch to a binary output stream, with a compressed representation. + * takes significantly less space for mostly-empty sketches. + * @param binary_out the stream to write to. + */ + void compressed_serialize(std::ostream& binary_out) const; + inline void reset_sample_state() { sample_idx = 0; } // return the size of the sketching datastructure in bytes (just the buckets, not the metadata) - inline size_t bucket_array_bytes() const { return num_buckets * sizeof(Bucket); } + inline size_t bucket_array_bytes() const { +#ifdef EAGER_BUCKET_CHECK + return (num_buckets * sizeof(Bucket)) + (num_columns * sizeof(vec_t)); +#else + return num_buckets * sizeof(Bucket); +#endif + } inline const Bucket* get_readonly_bucket_ptr() const { return (const Bucket*) buckets; } inline uint64_t get_seed() const { return seed; } diff --git a/src/cc_sketch_alg.cpp b/src/cc_sketch_alg.cpp index a1e688db..6f0582ad 100644 --- a/src/cc_sketch_alg.cpp +++ b/src/cc_sketch_alg.cpp @@ -111,7 +111,15 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex, for (const auto &dst : dst_vertices) { delta_sketch.update(static_cast(concat_pairing_fn(src_vertex, dst))); +#ifdef EAGER_BUCKET_CHECK + delta_sketch.unsafe_update(static_cast(concat_pairing_fn(src_vertex, dst))); } + for (size_t i = 0; i < delta_sketch.num_columns; i++) { + delta_sketch.recalculate_flags(i, 0, delta_sketch.bkt_per_col); + } +#else // EAGER_BUCKET_CHECK + } +#endif std::lock_guard lk(sketches[src_vertex]->mutex); sketches[src_vertex]->merge(delta_sketch); diff --git a/src/sketch.cpp b/src/sketch.cpp index ac674c5e..6f92af58 100644 --- a/src/sketch.cpp +++ b/src/sketch.cpp @@ -5,47 +5,141 @@ #include #include + +inline static void set_bit(vec_t &t, int position) { + t |= 1 << position; +} + +inline static void clear_bit(vec_t &t, int position) { + t &= ~(1 << position); +} + Sketch::Sketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols) : seed(seed) { num_samples = _samples; cols_per_sample = _cols; num_columns = num_samples * cols_per_sample; bkt_per_col = calc_bkt_per_col(vector_len); num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket +#ifdef EAGER_BUCKET_CHECK + buckets = (Bucket*) (new char[bucket_array_bytes()]); + nonempty_buckets = (vec_t*) (buckets + num_buckets); +#else buckets = new Bucket[num_buckets]; +#endif // initialize bucket values for (size_t i = 0; i < num_buckets; ++i) { buckets[i].alpha = 0; buckets[i].gamma = 0; } + +#ifdef EAGER_BUCKET_CHECK + for (size_t i = 0; i < num_columns; ++i) { + nonempty_buckets[i] = 0; + } +#endif + } -Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples, - size_t _cols) + +Sketch::Sketch(vec_t vector_len, uint64_t seed, bool compressed, std::istream &binary_in, + size_t _samples, size_t _cols) : seed(seed) { num_samples = _samples; cols_per_sample = _cols; num_columns = num_samples * cols_per_sample; bkt_per_col = calc_bkt_per_col(vector_len); num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket - buckets = new Bucket[num_buckets]; + buckets = (Bucket*) new char[bucket_array_bytes()]; +#ifdef EAGER_BUCKET_CHECK + nonempty_buckets = (vec_t*) (buckets + num_buckets); +#endif + if (compressed) { + compressed_deserialize(binary_in); + } + else { + binary_in.read((char *)buckets, bucket_array_bytes()); + } +} + + + /** + * Occupies the contents of an empty sketch with input from a stream that contains + * the compressed version. + * @param binary_in Stream holding serialized/compressed sketch object. + */ +void Sketch::compressed_deserialize(std::istream& binary_in) { + //zero out the sketch: + for (size_t i = 0; i < num_buckets; i++) { + buckets[i].alpha = 0; + buckets[i].gamma = 0; + } +#ifdef ROW_MAJOR_SKETCHES + // first, read in the effective depth + uint8_t max_depth; + binary_in.read((char *) &max_depth, sizeof(uint8_t)); + #ifdef EAGER_BUCKET_CHECK + binary_in.read((char *) nonempty_buckets, sizeof(vec_t) * num_columns); + #endif + // grab the deterministic bucket: + binary_in.read((char *) (buckets + num_buckets -1), sizeof(Bucket)); + size_t effective_size = max_depth * num_columns; + binary_in.read((char *) buckets, sizeof(Bucket) * effective_size); +#else + uint8_t sizes[num_columns]; // Read the serialized Sketch contents - binary_in.read((char *)buckets, bucket_array_bytes()); + // first, read in the sizes + binary_in.read((char *) sizes, sizeof(uint8_t) * num_columns); +#ifdef EAGER_BUCKET_CHECK + binary_in.read((char *) nonempty_buckets, sizeof(vec_t) * num_columns); +#endif + // grab the deterministic bucket: + binary_in.read((char *) (buckets + num_buckets -1), sizeof(Bucket)); + for (size_t col_idx=0; col_idx < num_columns; col_idx++) { + Bucket *current_column = buckets + (col_idx * bkt_per_col); + binary_in.read((char *) current_column, sizeof(Bucket) * sizes[col_idx]); + } +#endif } +Sketch::Sketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples, + size_t _cols): + seed(seed) { + num_samples = _samples; + cols_per_sample = _cols; + num_columns = num_samples * cols_per_sample; + bkt_per_col = calc_bkt_per_col(vector_len); + num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket + buckets = (Bucket*) new char[bucket_array_bytes()]; +#ifdef EAGER_BUCKET_CHECK + nonempty_buckets = (vec_t*) (buckets + num_buckets); +#endif + binary_in.read((char *)buckets, bucket_array_bytes()); + + } + Sketch::Sketch(const Sketch &s) : seed(s.seed) { num_samples = s.num_samples; cols_per_sample = s.cols_per_sample; num_columns = s.num_columns; bkt_per_col = s.bkt_per_col; num_buckets = s.num_buckets; - buckets = new Bucket[num_buckets]; + buckets = (Bucket*) new char[bucket_array_bytes()]; + // buckets = new Bucket[num_buckets]; std::memcpy(buckets, s.buckets, bucket_array_bytes()); + + #ifdef EAGER_BUCKET_CHECK + nonempty_buckets = (vec_t*) (buckets + num_buckets); + #endif } -Sketch::~Sketch() { delete[] buckets; } +Sketch::~Sketch() { + delete[] buckets; + } + + #ifdef L0_SAMPLING void Sketch::update(const vec_t update_idx) { @@ -62,6 +156,9 @@ void Sketch::update(const vec_t update_idx) { size_t bucket_id = i * bkt_per_col + j; Bucket_Boruvka::update(buckets[bucket_id], update_idx, checksum); } +#ifdef EAGER_BUCKET_CHECK + recalculate_flags(i, 0, depth); +#endif } } } @@ -70,14 +167,26 @@ void Sketch::update(const vec_t update_idx) { vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); // Update depth 0 bucket - Bucket_Boruvka::update(buckets[num_buckets - 1], update_idx, checksum); + Bucket_Boruvka::update(get_deterministic_bucket(), update_idx, checksum); // Update higher depth buckets for (unsigned i = 0; i < num_columns; ++i) { col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); - size_t bucket_id = i * bkt_per_col + depth; + + /** + * TODO - write comment. calculates the hash and depth together. + */ + // size_t bucket_id = i * bkt_per_col + depth; + Bucket &bucket = get_bucket(i, depth); likely_if(depth < bkt_per_col) { - Bucket_Boruvka::update(buckets[bucket_id], update_idx, checksum); + Bucket_Boruvka::update(bucket, update_idx, checksum); + #ifdef EAGER_BUCKET_CHECK + likely_if(!Bucket_Boruvka::is_empty(bucket)) { + set_bit(nonempty_buckets[i], depth); + } else { + clear_bit(nonempty_buckets[i], depth); + } + #endif } } } @@ -99,17 +208,22 @@ SketchSample Sketch::sample() { size_t idx = sample_idx++; size_t first_column = idx * cols_per_sample; - if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0) + if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return - if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed())) - return {buckets[num_buckets - 1].alpha, GOOD}; + if (Bucket_Boruvka::is_good(get_deterministic_bucket(), checksum_seed())) + return {get_deterministic_bucket().alpha, GOOD}; + - for (size_t i = 0; i < cols_per_sample; ++i) { - for (size_t j = 0; j < bkt_per_col; ++j) { - size_t bucket_id = (i + first_column) * bkt_per_col + j; - if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed())) - return {buckets[bucket_id].alpha, GOOD}; + for (size_t col = first_column; col < first_column + cols_per_sample; ++col) { + int row = int(effective_size(col))-1; + int window_size = 6; // <- log_2(64), the maximum sketch depth + // now that we've found a non-zero bucket check next if next 6 buckets good + int stop = std::max(row - window_size, 0); + for (; row >= stop; row--) { + Bucket &bucket = get_bucket(col, row); + if (Bucket_Boruvka::is_good(bucket, checksum_seed())) + return {bucket.alpha, GOOD}; } } return {0, FAIL}; @@ -124,20 +238,24 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() { size_t idx = sample_idx++; size_t first_column = idx * cols_per_sample; - unlikely_if (buckets[num_buckets - 1].alpha == 0 && buckets[num_buckets - 1].gamma == 0) + + Bucket &deterministic_bucket = get_deterministic_bucket(); + unlikely_if (Bucket_Boruvka::is_empty(deterministic_bucket)) return {ret, ZERO}; // the "first" bucket is deterministic so if zero then no edges to return - unlikely_if (Bucket_Boruvka::is_good(buckets[num_buckets - 1], checksum_seed())) { - ret.insert(buckets[num_buckets - 1].alpha); + unlikely_if (Bucket_Boruvka::is_good(deterministic_bucket, checksum_seed())) { + ret.insert(deterministic_bucket.alpha); return {ret, GOOD}; } - for (size_t i = 0; i < cols_per_sample; ++i) { - for (size_t j = 0; j < bkt_per_col; ++j) { - size_t bucket_id = (i + first_column) * bkt_per_col + j; - unlikely_if (Bucket_Boruvka::is_good(buckets[bucket_id], checksum_seed())) { - ret.insert(buckets[bucket_id].alpha); - } + for (size_t col = first_column; col < first_column + cols_per_sample; ++col) { + int row = effective_size(col)-1; + int window_size = 6; + int stop = std::max(row - window_size, 0); + for (; row >= stop; row--) { + Bucket &bucket = get_bucket(col, row); + if (Bucket_Boruvka::is_good(bucket, checksum_seed())) + ret.insert(bucket.alpha); } } @@ -146,13 +264,70 @@ ExhaustiveSketchSample Sketch::exhaustive_sample() { return {ret, GOOD}; } + void Sketch::merge(const Sketch &other) { - for (size_t i = 0; i < num_buckets; ++i) { - buckets[i].alpha ^= other.buckets[i].alpha; - buckets[i].gamma ^= other.buckets[i].gamma; + Bucket &deterministic_bucket = get_deterministic_bucket(); + for (size_t i=0; i < num_columns; ++i) { + // Bucket *current_col = buckets + (i* bkt_per_col); + // Bucket *other_col = other.buckets + (i * bkt_per_col); + size_t other_effective_size = other.effective_size(i); + // size_t other_effective_size = bkt_per_col; + #pragma omp simd + for (size_t bucket_id=0; bucket_id < other_effective_size; bucket_id++) { + // current_col[bucket_id].alpha ^= other_col[bucket_id].alpha; + // current_col[bucket_id].gamma ^= other_col[bucket_id].gamma; + get_bucket(i, bucket_id).alpha ^= other.get_bucket(i, bucket_id).alpha; + get_bucket(i, bucket_id).gamma ^= other.get_bucket(i, bucket_id).gamma; + } +#ifdef EAGER_BUCKET_CHECK + recalculate_flags(i, 0, other_effective_size); +#endif } + + // seperately update the deterministic bucket + deterministic_bucket.alpha ^= other.get_deterministic_bucket().alpha; + deterministic_bucket.gamma ^= other.get_deterministic_bucket().gamma; } +#ifdef EAGER_BUCKET_CHECK +void Sketch::unsafe_update() { + vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed()); + + // Update depth 0 bucket + Bucket_Boruvka::update(buckets[num_buckets - 1], update_idx, checksum); + + // Update higher depth buckets + for (unsigned i = 0; i < num_columns; ++i) { + col_hash_t depth = Bucket_Boruvka::get_index_depth(update_idx, column_seed(i), bkt_per_col); + Bucket &bucket = get_bucket(i, depth); + likely_if(depth < bkt_per_col) { + Bucket_Boruvka::update(bucket, update_idx, checksum); + } + } +} + +#endif + +#ifdef EAGER_BUCKET_CHECK +void Sketch::recalculate_flags(size_t col_idx, size_t start_idx, size_t end_idx) { + // Bucket *current_col = buckets + (col_idx * bkt_per_col); + assert(end_idx >= start_idx); + vec_t clear_mask = (~0) >> (8*sizeof(vec_t) - (end_idx - start_idx)); + clear_mask = ~(clear_mask << start_idx); + vec_t col_nonempty_buckets = 0; + // vec_t col_nonempty_buckets = ~0; + #pragma omp simd + for (size_t bucket_id=start_idx; bucket_id < end_idx; bucket_id++) { + // likely_if(!Bucket_Boruvka::is_empty(current_col[bucket_id])) set_bit(col_nonempty_buckets, bucket_id); + likely_if(!Bucket_Boruvka::is_empty(get_bucket(col_idx, bucket_id))) set_bit(col_nonempty_buckets, bucket_id); + // unlikely_if(Bucket_Boruvka::is_empty(current_col[bucket_id])) clear_bit(col_nonempty_buckets,bucket_id); + } + nonempty_buckets[col_idx] = (nonempty_buckets[col_idx] & clear_mask) | (col_nonempty_buckets & ~clear_mask); +} +#endif + + + void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samples) { if (start_sample + n_samples > num_samples) { assert(false); @@ -164,18 +339,38 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp sample_idx = std::max(sample_idx, start_sample); // merge deterministic buffer - buckets[num_buckets - 1].alpha ^= other.buckets[num_buckets - 1].alpha; - buckets[num_buckets - 1].gamma ^= other.buckets[num_buckets - 1].gamma; - + get_deterministic_bucket() ^= other.get_deterministic_bucket(); + + size_t start_col_id = start_sample * cols_per_sample; + size_t end_col_id = (start_sample + n_samples) * cols_per_sample; + for (size_t col=start_col_id; col < end_col_id; col++ ) { +#ifdef EAGER_BUCKET_CHECK + size_t effective_size = effective_size(col); +#else + size_t effective_size = bkt_per_col; +#endif + for (size_t row=0; row < effective_size; row++) { + get_bucket(col, row) ^= other.get_bucket(col, row); + } + } // merge other buckets - size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col; - size_t n_buckets = n_samples * cols_per_sample * bkt_per_col; - for (size_t i = 0; i < n_buckets; i++) { - size_t bucket_id = start_bucket_id + i; - buckets[bucket_id].alpha ^= other.buckets[bucket_id].alpha; - buckets[bucket_id].gamma ^= other.buckets[bucket_id].gamma; + // size_t start_bucket_id = start_sample * cols_per_sample * bkt_per_col; + // size_t n_buckets = n_samples * cols_per_sample * bkt_per_col; + + // for (size_t i = 0; i < n_buckets; i++) { + // size_t bucket_id = start_bucket_id + i; + // buckets[bucket_id] ^= other.buckets[bucket_id]; + // } + + +#ifdef EAGER_BUCKET_CHECK + size_t start_col_id = start_sample * cols_per_sample; + size_t end_col_id = (start_sample + n_samples) * cols_per_sample; + for (size_t i=start_col_id; i < end_col_id; i++ ) { + recalculate_flags(i, 0, other.effective_size(i)); } +#endif } void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { @@ -183,9 +378,86 @@ void Sketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) { buckets[i].alpha ^= raw_buckets[i].alpha; buckets[i].gamma ^= raw_buckets[i].gamma; } +#ifdef EAGER_BUCKET_CHECK + for (size_t col_idx=0; col_idx < num_columns; col_idx++) { + recalculate_flags(col_idx, 0, bkt_per_col); + } +#endif +} + +uint8_t Sketch::effective_size(size_t col_idx) const +{ + // first, check for emptyness + if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) + { + return 0; + } +#ifdef EAGER_BUCKET_CHECK + unlikely_if(nonempty_buckets[col_idx] == 0) return 0; + return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty_buckets[col_idx])); +#else + uint8_t idx = bkt_per_col - 1; + // while (idx != 0 && Bucket_Boruvka::is_empty(current_row[idx])) + while (idx != 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) + { + idx--; + } + unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(get_bucket(col_idx, idx))) return 0; + // unlikely_if(idx == 0 && Bucket_Boruvka::is_empty(current_row[idx])) return 0; + else return idx + 1; +#endif +} + +uint8_t Sketch::effective_depth() const +{ + unlikely_if(Bucket_Boruvka::is_empty(get_deterministic_bucket())) return 0; + #ifdef EAGER_BUCKET_CHECK + vec_t nonempty = 0; + for (size_t i = 0; i < num_columns; i++) { + nonempty |= nonempty_buckets[i]; + } + unlikely_if(nonempty == 0) return 0; + return (uint8_t)((sizeof(unsigned long long) * 8) - __builtin_clzll(nonempty)); + #else + uint8_t max_size = 0; + for (size_t i = 0; i < num_columns; i++) { + max_size = std::max(max_size, effective_size(i)); + } + return max_size; + #endif +} + +void Sketch::compressed_serialize(std::ostream &binary_out) const { +#ifdef ROW_MAJOR_SKETCHES + // write out max depth, nonempty flags, determinstic bucket, everything else + // then all other buckets + uint8_t max_depth = effective_size(); + binary_out.write((char*) &max_depth, sizeof(uint8_t)); + size_t number_of_buckets = num_columns * max_depth; + binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket)); + #ifdef EAGER_BUCKET_CHECK + binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns); + #endif + binary_out.write((char *) buckets, sizeof(Bucket) * number_of_buckets); +#else + uint8_t sizes[num_columns]; + for (size_t i=0; i < num_columns; i++) { + sizes[i] = effective_size(i); + } + binary_out.write((char*) sizes, sizeof(uint8_t) * num_columns); + #ifdef EAGER_BUCKET_CHECK + binary_out.write((char *) nonempty_buckets, sizeof(vec_t) * num_columns); + #endif + binary_out.write((char *) &get_deterministic_bucket(), sizeof(Bucket)); + for (size_t i=0; i < num_columns; i++) { + Bucket *current_column = buckets + (i * bkt_per_col); + binary_out.write((char *) current_column, sizeof(Bucket) * sizes[i]); + } +#endif } void Sketch::serialize(std::ostream &binary_out) const { + // note that these will include the flag bits, if used. binary_out.write((char*) buckets, bucket_array_bytes()); } @@ -213,8 +485,7 @@ std::ostream &operator<<(std::ostream &os, const Sketch &sketch) { for (unsigned i = 0; i < sketch.num_columns; ++i) { for (unsigned j = 0; j < sketch.bkt_per_col; ++j) { - unsigned bucket_id = i * sketch.bkt_per_col + j; - Bucket bkt = sketch.buckets[bucket_id]; + Bucket bkt = sketch.get_bucket(i, j); vec_t a = bkt.alpha; vec_hash_t c = bkt.gamma; bool good = Bucket_Boruvka::is_good(bkt, sketch.checksum_seed()); diff --git a/test/sketch_test.cpp b/test/sketch_test.cpp index e35f4aa2..a082525d 100644 --- a/test/sketch_test.cpp +++ b/test/sketch_test.cpp @@ -313,6 +313,25 @@ TEST(SketchTestSuite, TestSerialization) { ASSERT_EQ(sketch, reheated); } +TEST(SketchTestSuite, TestCompressedSerialize) { + unsigned long vec_size = 1 << 10; + unsigned long num_updates = 10000; + Testing_Vector test_vec = Testing_Vector(vec_size, num_updates); + auto seed = get_seed(); + Sketch sketch(vec_size, seed, 3, num_columns); + for (unsigned long j = 0; j < num_updates; j++){ + sketch.update(test_vec.get_update(j)); + } + auto file = std::fstream("./out_sketch_comp.txt", std::ios::out | std::ios::binary | std::ios::trunc); + sketch.compressed_serialize(file); + file.close(); + + auto in_file = std::fstream("./out_sketch_comp.txt", std::ios::in | std::ios::binary); + Sketch reheated(vec_size, seed, true, in_file, 3, num_columns); + + ASSERT_EQ(sketch, reheated); +} + TEST(SketchTestSuite, TestSamplesHaveUniqueSeed) { size_t num_samples = 50; size_t cols_per_sample = 3; diff --git a/tools/benchmark/graphcc_bench.cpp b/tools/benchmark/graphcc_bench.cpp index fc5b8995..ba07cb49 100644 --- a/tools/benchmark/graphcc_bench.cpp +++ b/tools/benchmark/graphcc_bench.cpp @@ -17,7 +17,11 @@ constexpr uint64_t KB = 1024; constexpr uint64_t MB = KB * KB; -constexpr uint64_t seed = 374639; + +static size_t get_seed() { + auto now = std::chrono::high_resolution_clock::now(); + return std::chrono::duration_cast(now.time_since_epoch()).count(); +} // If this flag is uncommented then run the FileIngestion benchmarks // #define FILE_INGEST_F @@ -113,76 +117,102 @@ static void BM_MTFileIngest(benchmark::State& state) { BENCHMARK(BM_MTFileIngest)->RangeMultiplier(4)->Range(1, 20)->UseRealTime(); #endif // FILE_INGEST_F +static void BM_Multiply(benchmark::State& state) { + size_t x = 5; + size_t y = 9; + for (auto _ : state) { + benchmark::DoNotOptimize(x = x * y); + y += 1; + } +} +BENCHMARK(BM_Multiply); + static void BM_builtin_ffsll(benchmark::State& state) { size_t i = 0; - size_t j = -1; + size_t diff = 1; + if (state.range(0) == 1) { + i = size_t(-1); + diff = -1; + } for (auto _ : state) { - benchmark::DoNotOptimize(__builtin_ffsll(i++)); - benchmark::DoNotOptimize(__builtin_ffsll(j--)); + benchmark::DoNotOptimize(__builtin_ffsll(i)); + i += diff; } } -BENCHMARK(BM_builtin_ffsll); +BENCHMARK(BM_builtin_ffsll)->DenseRange(0, 1); static void BM_builtin_ctzll(benchmark::State& state) { size_t i = 0; - size_t j = -1; + size_t diff = 1; + if (state.range(0) == 1) { + i = size_t(-1); + diff = -1; + } for (auto _ : state) { - benchmark::DoNotOptimize(__builtin_ctzll(i++)); - benchmark::DoNotOptimize(__builtin_ctzll(j--)); + benchmark::DoNotOptimize(__builtin_ctzll(i)); + i += diff; } } -BENCHMARK(BM_builtin_ctzll); +BENCHMARK(BM_builtin_ctzll)->DenseRange(0, 1); static void BM_builtin_clzll(benchmark::State& state) { size_t i = 0; - size_t j = -1; + size_t diff = 1; + if (state.range(0) == 1) { + i = size_t(-1); + diff = -1; + } for (auto _ : state) { - benchmark::DoNotOptimize(__builtin_clzll(i++)); - benchmark::DoNotOptimize(__builtin_clzll(j--)); + benchmark::DoNotOptimize(__builtin_clzll(i)); + i += diff; } } -BENCHMARK(BM_builtin_clzll); +BENCHMARK(BM_builtin_clzll)->DenseRange(0, 1); // Test the speed of hashing using a method that loops over seeds and a method that // batches by seed // The argument to this benchmark is the number of hashes to batch static void BM_Hash_XXH64(benchmark::State& state) { uint64_t input = 100'000; + size_t seed = get_seed(); for (auto _ : state) { ++input; benchmark::DoNotOptimize(XXH64(&input, sizeof(uint64_t), seed)); } - state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); + state.counters["Hashes"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } BENCHMARK(BM_Hash_XXH64); static void BM_Hash_XXH3_64(benchmark::State& state) { uint64_t input = 100'000; + size_t seed = get_seed(); for (auto _ : state) { ++input; benchmark::DoNotOptimize(XXH3_64bits_withSeed(&input, sizeof(uint64_t), seed)); } - state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); + state.counters["Hashes"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } BENCHMARK(BM_Hash_XXH3_64); static void BM_index_depth_hash(benchmark::State& state) { uint64_t input = 100'000; + size_t seed = get_seed(); for (auto _ : state) { ++input; benchmark::DoNotOptimize(Bucket_Boruvka::get_index_depth(input, seed, 20)); } - state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); + state.counters["Hashes"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } BENCHMARK(BM_index_depth_hash); static void BM_index_hash(benchmark::State& state) { uint64_t input = 100'000; + size_t seed = get_seed(); for (auto _ : state) { ++input; benchmark::DoNotOptimize(Bucket_Boruvka::get_index_hash(input, seed)); } - state.counters["Hash Rate"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); + state.counters["Hashes"] = benchmark::Counter(state.iterations(), benchmark::Counter::kIsRate); } BENCHMARK(BM_index_hash); @@ -204,8 +234,9 @@ BENCHMARK(BM_update_bucket); static void BM_Sketch_Update(benchmark::State& state) { size_t vec_size = state.range(0); vec_t input = vec_size / 3; + size_t seed = get_seed(); // initialize sketches - Sketch skt(vec_size, seed, 1, Sketch::default_cols_per_sample); + Sketch skt(vec_size, seed); // Test the speed of updating the sketches for (auto _ : state) { @@ -216,43 +247,46 @@ static void BM_Sketch_Update(benchmark::State& state) { state.counters["Hashes"] = benchmark::Counter(state.iterations() * (skt.get_columns() + 1), benchmark::Counter::kIsRate); } -BENCHMARK(BM_Sketch_Update)->RangeMultiplier(4)->Ranges({{KB << 4, MB << 4}}); +BENCHMARK(BM_Sketch_Update)->RangeMultiplier(4)->Ranges({{KB << 4, MB << 15}}); // Benchmark the speed of querying sketches -static void BM_Sketch_Query(benchmark::State& state) { - constexpr size_t vec_size = KB << 5; - constexpr size_t num_sketches = 100; - double density = ((double)state.range(0)) / 100; +static constexpr size_t sample_vec_size = MB << 10; +static void BM_Sketch_Sample(benchmark::State& state) { + constexpr size_t num_sketches = 400; - // initialize sketches + // initialize sketches with different seeds Sketch* sketches[num_sketches]; for (size_t i = 0; i < num_sketches; i++) { - sketches[i] = new Sketch(vec_size, seed, 1, Sketch::default_cols_per_sample); + sketches[i] = new Sketch(sample_vec_size, get_seed() * 7); } - // perform updates (do at least 1) + // perform updates to the sketches (do at least 1) for (size_t i = 0; i < num_sketches; i++) { - for (size_t j = 0; j < vec_size * density + 1; j++) { + for (size_t j = 0; j < size_t(state.range(0)); j++) { sketches[i]->update(j + 1); } } SketchSample sample_ret; + size_t successes = 0; for (auto _ : state) { // perform queries for (size_t j = 0; j < num_sketches; j++) { - benchmark::DoNotOptimize(sample_ret = sketches[j]->sample()); + sample_ret = sketches[j]->sample(); + successes += sample_ret.result == GOOD; sketches[j]->reset_sample_state(); } } - state.counters["Query Rate"] = + state.counters["Samples"] = benchmark::Counter(state.iterations() * num_sketches, benchmark::Counter::kIsRate); + state.counters["Successes"] = double(successes) / (state.iterations() * num_sketches); } -BENCHMARK(BM_Sketch_Query)->DenseRange(0, 90, 10); +BENCHMARK(BM_Sketch_Sample)->RangeMultiplier(4)->Range(1, sample_vec_size / 2); static void BM_Sketch_Merge(benchmark::State& state) { size_t n = state.range(0); size_t upds = n / 100; + size_t seed = get_seed(); Sketch s1(n, seed); Sketch s2(n, seed); @@ -264,12 +298,41 @@ static void BM_Sketch_Merge(benchmark::State& state) { for (auto _ : state) { s1.merge(s2); } + s1.get_buckets(); } -BENCHMARK(BM_Sketch_Merge)->RangeMultiplier(10)->Range(1e3, 1e6); +BENCHMARK(BM_Sketch_Merge)->RangeMultiplier(8)->Range(1 << 4, 1 << 20); + +static void BM_Sketch_Merge_Many(benchmark::State& state) { + size_t n = state.range(0); + size_t upds = (sqrtl(n)) / 10; + size_t seed = get_seed(); + size_t num_sketches = 1 << 7; + Sketch source(n, seed); + // Sketch* dests = (Sketch*) malloc(num_sketches * sizeof(Sketch)); + std::vector dests; + // TODO - THERES A BUNCH OF UNFREED MEMORY HERE + for (size_t i=0; i < num_sketches; i++) + dests.push_back(new Sketch(n, seed)); + // Sketch s2(n, seed); + + for (size_t i = 0; i < upds; i++) { + source.update(static_cast(concat_pairing_fn(rand() % n, rand() % n))); + for (auto dest: dests) + dest->update(static_cast(concat_pairing_fn(rand() % n, rand() % n))); + } + + for (auto _ : state) { + for (size_t i=0; i < num_sketches; i++) + source.merge(*dests[i]); + // s1.merge(s2); + } +} +BENCHMARK(BM_Sketch_Merge_Many)->RangeMultiplier(8)->Range(1, ((size_t) 1) << 48); static void BM_Sketch_Serialize(benchmark::State& state) { size_t n = state.range(0); size_t upds = n / 100; + size_t seed = get_seed(); Sketch s1(n, seed); for (size_t i = 0; i < upds; i++) { @@ -281,7 +344,7 @@ static void BM_Sketch_Serialize(benchmark::State& state) { s1.serialize(stream); } } -BENCHMARK(BM_Sketch_Serialize)->RangeMultiplier(10)->Range(1e3, 1e6); +BENCHMARK(BM_Sketch_Serialize)->RangeMultiplier(10)->Range(1e3, 1e12); // static void BM_Sketch_Sparse_Serialize(benchmark::State& state) { // size_t n = state.range(0); @@ -534,4 +597,42 @@ static void BM_Parallel_DSU_Root(benchmark::State& state) { } BENCHMARK(BM_Parallel_DSU_Root)->RangeMultiplier(2)->Range(1, 8)->UseRealTime(); -BENCHMARK_MAIN(); + +static void BM_Std_Set_Hash_Insert(benchmark::State& state) { + constexpr size_t size = 1e6; + std::unordered_set set; + + size_t x = 0; + size_t range = size / state.range(0); + for (auto _ : state) { + set.insert(x); + x = ((x + 1) % range); + } + + state.counters["Insert_Latency"] = benchmark::Counter( + state.iterations(), benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_Std_Set_Hash_Insert)->RangeMultiplier(2)->Range(1, 1 << 14); + +static void BM_Std_Set_Hash_Iterator(benchmark::State& state) { + constexpr size_t size = 1e6; + std::unordered_set set; + + size_t range = size / state.range(0); + for (size_t i = 0; i < range; i++) { + set.insert(i); + } + + for (auto _ : state) { + // iterate over the set + for (auto &elm : set) { + benchmark::DoNotOptimize(elm); + } + } + + state.counters["Scan_Latency"] = benchmark::Counter( + state.iterations(), benchmark::Counter::kIsRate | benchmark::Counter::kInvert); +} +BENCHMARK(BM_Std_Set_Hash_Iterator)->RangeMultiplier(2)->Range(1, 1 << 14); + +BENCHMARK_MAIN(); \ No newline at end of file diff --git a/tools/process_stream.cpp b/tools/process_stream.cpp index 53c49586..e36b4c1a 100644 --- a/tools/process_stream.cpp +++ b/tools/process_stream.cpp @@ -96,6 +96,18 @@ int main(int argc, char **argv) { auto cc_start = std::chrono::steady_clock::now(); driver.prep_query(CONNECTIVITY); + size_t total_size = 0; + size_t full_size= 0; + + for (size_t i=0; i < num_nodes; i++) { + auto sketch = *cc_alg.sketches[i]; + for (size_t j=0; j < sketch.num_columns; j++) { + total_size += sketch.effective_size(j); + full_size += sketch.bkt_per_col; + } + } + std::cout << "Effective Storage Used (in number of buckets): " << total_size << "/" << full_size << std::endl; + auto CC_num = cc_alg.connected_components().size(); std::chrono::duration cc_time = std::chrono::steady_clock::now() - cc_start; std::chrono::duration insert_time = driver.flush_end - ins_start;