Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
etwest committed Jul 24, 2024
2 parents cdac032 + db06e66 commit 4ea0614
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 27 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ if (BUILD_EXE)
add_dependencies(tests GraphZeppelinVerifyCC)
target_link_libraries(tests PRIVATE GraphZeppelinVerifyCC)

add_executable(statistical_sketch_test
tools/sketch_testing.cpp)
add_dependencies(statistical_sketch_test GraphZeppelinVerifyCC)
target_link_libraries(statistical_sketch_test PRIVATE GraphZeppelinVerifyCC)

# executable for processing a binary graph stream
add_executable(process_stream
tools/process_stream.cpp)
Expand Down
5 changes: 0 additions & 5 deletions src/cc_alg_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ CCAlgConfiguration& CCAlgConfiguration::sketches_factor(double factor) {
<< "Defaulting to 1." << std::endl;
_sketches_factor = 1;
}
if (_sketches_factor != 1) {
std::cerr << "WARNING: Your graph configuration specifies using a factor " << _sketches_factor
<< " of the normal quantity of sketches." << std::endl;
std::cerr << " Is this intentional? If not, set sketches_factor to one!" << std::endl;
}
return *this;
}

Expand Down
6 changes: 3 additions & 3 deletions src/cc_sketch_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ void CCSketchAlg::pre_insert(GraphUpdate upd, int /* thr_id */) {
auto dst = std::max(edge.src, edge.dst);
std::lock_guard<std::mutex> sflock(spanning_forest_mtx[src]);
if (dsu.merge(src, dst).merged) {
// edge is new connectivity info. Add to spanning forest.
// this edge adds new connectivity information so add to spanning forest
spanning_forest[src].insert(dst);
}
}
else if (spanning_forest[src].find(dst) != spanning_forest[src].end()) {
// edge is not new connectivity info and is deletion of spanning forest edge. dsu invalid.
// this update deletes one of our spanning forest edges so mark dsu invalid
dsu_valid = false;
shared_dsu_valid = false;
}
Expand Down
66 changes: 47 additions & 19 deletions tools/benchmark/graphcc_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,23 @@ static void BM_DSU_Find(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
DisjointSetUnion<node_id_t> dsu(size_of_dsu);

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (size_t i = 0; i < size_of_dsu; i++)
dsu.find_root(i);
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * size_of_dsu,
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_DSU_Find)->Iterations(1);
BENCHMARK(BM_DSU_Find);

static void BM_DSU_Find_After_Combine(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
Expand All @@ -323,52 +330,73 @@ static void BM_DSU_Find_After_Combine(benchmark::State& state) {
dsu.merge(i, i+1);
}

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (size_t i = 0; i < size_of_dsu; i++)
dsu.find_root(i);
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * size_of_dsu,
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_DSU_Find_After_Combine)->Iterations(1);
BENCHMARK(BM_DSU_Find_After_Combine);

// MT DSU Find Root
static void BM_Parallel_DSU_Find(benchmark::State& state) {
static void BM_MT_DSU_Find(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 4096; i++) {
queries.push_back((size_of_dsu / 4096) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (size_t i = 0; i < size_of_dsu; i++)
dsu.find_root(i);
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * size_of_dsu,
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_Parallel_DSU_Find)->Iterations(1);
BENCHMARK(BM_MT_DSU_Find);

// MT DSU Find Root
static void BM_Parallel_DSU_Find_After_Combine(benchmark::State& state) {
constexpr size_t size_of_dsu = 16 * MB;
static void BM_MT_DSU_Find_After_Combine(benchmark::State& state) {
constexpr size_t size_of_dsu = MB;
DisjointSetUnion_MT<node_id_t> dsu(size_of_dsu);
// merge everything into same root
for (size_t i = 0; i < size_of_dsu - 1; i++) {
dsu.merge(i, i+1);
}

auto rng = std::default_random_engine{};
std::vector<node_id_t> queries;
for (size_t i = 0; i < 512; i++) {
queries.push_back((size_of_dsu / 512) * i);
}
std::shuffle(queries.begin(), queries.end(), rng);

// perform find test
for (auto _ : state) {
for (size_t i = 0; i < size_of_dsu; i++)
dsu.find_root(i);
for (auto q : queries)
dsu.find_root(q);
}
state.counters["Find_Latency"] =
benchmark::Counter(state.iterations() * size_of_dsu,
benchmark::Counter(state.iterations() * queries.size(),
benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
}
BENCHMARK(BM_Parallel_DSU_Find_After_Combine)->Iterations(1);
BENCHMARK(BM_MT_DSU_Find_After_Combine);

// Benchmark speed of DSU merges when the sequence of merges is adversarial
// This means we avoid joining roots wherever possible
Expand Down
171 changes: 171 additions & 0 deletions tools/sketch_testing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include <iostream>
#include <random>
#include <set>
#include <cassert>

#include "sketch.h"
#include "cc_alg_configuration.h"

/*
The purpose of this file is to test the probability that a sketch column returns a nonzero
That is, for a number of nonzeroes z, how what's the probability of success?
We model this as a binomial process for the sake of confidence intervals / stddev.
Originally, this code inserted z random elements into a sketch then queried it.
As a first speed optimization (that didn't appear to change outcome) (xxHash works well)
We replaced random insertion with sequential inserted.
As a second speed optimization, we queried the sketch after every update.
That is, instead of O(z^2) insertions per z data points, we perform O(z) insertions per z data points.
This sacrifices independence. Whether or not the z-1th sketch is good is a fantastic predictor for the zth sketch being good.
But, for a given z, the results are still independent.
For parity with the main code, column seeds are sequential.
The output of this is intended to be parsed into summary stats by sum_sketch_testing.py
*/


std::random_device dev;

std::mt19937_64 rng(dev());
using rand_type = std::mt19937_64::result_type;


rand_type gen(rand_type n)
{
std::uniform_int_distribution<rand_type> dist(0,n-1);
return dist(rng);
}

rand_type seed = gen(1ll << 62);

rand_type gen_seed()
{
//std::uniform_int_distribution<rand_type> dist(0,1ll << 63);
//return dist(rng);
return seed++;
}


enum ResultType {
R_GOOD=0,
R_BAD=1,
R_HASHFAIL=2
};

ResultType test_z(rand_type n, rand_type z)
{
assert(z >= 1);
assert(z <= n*n);
Sketch sketch(n, gen_seed(), 1, 1);

// Generate z edges and track them
/*std::unordered_set<rand_type> edges;
while (edges.size() < z)
{
edges.insert(gen(n*n));
}
for (const auto& r : edges)
{
sketch.update(r);
}
*/
for (rand_type i = 0; i < z; i++)
sketch.update(i);
// Sample the sketches
SketchSample query_ret = sketch.sample();
SampleResult ret_code = query_ret.result;

assert(ret_code != ZERO);

if (ret_code == GOOD)
{
//if (edges.find(res) == edges.end())
// return R_HASHFAIL;
return R_GOOD;
}
return R_BAD;
}

std::pair<double, double> fit_to_binomial(rand_type ngood, rand_type ntrials)
{
double p = ngood / (1.0 * ntrials);
double variance = ntrials * p * (1-p);
double stddev = sqrt(variance);
return std::pair<double, double>(p, stddev/ntrials);
}

std::pair<double, double> test_nz_pair(rand_type n, rand_type z)
{
int ntrials = 500;
int results[3] = {0,0,0};
for (int i = 0; i < ntrials; i++)
results[test_z(n, z)]++;
//std::cout << "GOOD: " << results[0] << std::endl;
//std::cout << "BAD: " << results[1] << std::endl;
//std::cout << "HASHFAIL: " << results[2] << std::endl;
int ngood = results[0];
// Fit to binomial
return fit_to_binomial(ngood, ntrials);
}

void test_n_one(rand_type n, rand_type* good, rand_type max_z)
{
Sketch sketch(n*n, gen_seed(), 1, 1);
for (rand_type i = 0; i < max_z; i++)
{
sketch.update(i);
// Sample the sketches
SketchSample query_ret = sketch.sample();
SampleResult ret_code = query_ret.result;
//assert(ret_code != ZERO);
if (ret_code == GOOD)
good[i]++;
sketch.reset_sample_state();
}
}

void test_n(rand_type n)
{
int ntrials = 500;
rand_type max_z = 1+(n*n)/4;
// Default init to 0?
rand_type* good = new rand_type[max_z];
for (int i = 0; i < ntrials; i++)
test_n_one(n, good, max_z);

double worst_3sigma = 1;
rand_type worst_i = 0;
for (rand_type i = 0; i < max_z; i++)
{
auto pair = fit_to_binomial(good[i], ntrials);
double ans = pair.first;
double stddev = pair.second;
std::cout << i << ": " << ans << " +- " << stddev << std::endl;
if (ans - 3 * stddev < worst_3sigma)
{
worst_i = i;
worst_3sigma = ans-3*stddev;
}
}
auto pair = fit_to_binomial(good[worst_i], ntrials);
double ans = pair.first;
double stddev = pair.second;
std::cout << "WORST" << std::endl;
std::cout << worst_i << ": " << ans << " +- " << stddev << std::endl;

delete[] good;
}

int main()
{
std::cout << CCAlgConfiguration() << std::endl;
rand_type n = 1 << 13;
std::cout << "TESTING: " << n << " TO " << (n*n)/4 << std::endl;
test_n(n);
}
63 changes: 63 additions & 0 deletions tools/sum_sketch_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import sys
import re

"""
The purpose of this file is to parse the output of sketch_testing.cpp into summary statistics
That is, we can answer questions like "how many data points are 2 stddev above .8"
or "What is the mean of the data"
"""

prob = r"([0-9]*[.])?[0-9]+"
which = r"[0-9]+"

pattern = re.compile("(" + which + "): (" + prob + ") \+- (" + prob + ")")

def parse(filename):
with open(filename) as file:
lines = file.readlines()[:4000000]
stats = []
for l in lines:
match = pattern.match(l)
if match:
t = (int(match.group(1)), float(match.group(2)), float(match.group(4)))
stats.append(t)
return stats

def above(stats, target, sigmas):
above = 0
below = 0


for s in stats:
if (s[1] - sigmas * s[2] > target):
above += 1
else:
below += 1
print("BELOW")

print (above / (above + below))


def mean(stats, sigmas):
summ = 0
count = 0
for s in stats:
count += 1
summ += s[1] - sigmas * s[2]
print(summ/count)


stats = parse(sys.argv[1])

above(stats, 0.76, 0)
#above(stats, 0.78, 1)
#above(stats, 0.78, 2)

#mean(stats, 3)







0 comments on commit 4ea0614

Please sign in to comment.