Skip to content

Commit

Permalink
made deletions parallel and faster yay
Browse files Browse the repository at this point in the history
  • Loading branch information
etwest committed Jan 2, 2025
1 parent 63cce9e commit d6d71d1
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 51 deletions.
10 changes: 10 additions & 0 deletions include/cc_sketch_alg.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ class CCSketchAlg {
*/
void boruvka_emulation();

/**
* Delete edges found in spanning forest from sketches
* Helper function for calc_disjoint_spanning_forests(k)
*/
void filter_sf_edges(SpanningForest &sf);

// constructor for use when reading from a serialized file
CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &binary_stream,
CCAlgConfiguration config);
Expand Down Expand Up @@ -257,6 +263,10 @@ class CCSketchAlg {
// time hooks for experiments
std::chrono::steady_clock::time_point cc_alg_start;
std::chrono::steady_clock::time_point cc_alg_end;
std::chrono::steady_clock::time_point sf_query_start;
std::chrono::steady_clock::time_point sf_query_end;
std::chrono::duration<double> query_time;
std::chrono::duration<double> delete_time;
size_t last_query_rounds = 0;

// getters
Expand Down
86 changes: 79 additions & 7 deletions src/cc_sketch_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,25 +567,97 @@ SpanningForest CCSketchAlg::calc_spanning_forest() {
return ret;
}

void CCSketchAlg::filter_sf_edges(SpanningForest &sf) {
auto start = std::chrono::steady_clock::now();

dsu_valid = false;
shared_dsu_valid = false;

auto edges = sf.get_edges();
size_t num = edges.size();
edges.resize(2 * edges.size());

#pragma omp parallel for
for (size_t i = 0; i < num; i++) {
edges[i + num] = edges[i];
std::swap(edges[i + num].src, edges[i + num].dst);
}

auto setup = std::chrono::steady_clock::now();
std::cout << "Setup time = " << std::chrono::duration<double>(setup - start).count() << std::endl;

// sort the edges
std::sort(edges.begin(), edges.end());

auto sort = std::chrono::steady_clock::now();
std::cout << "Sort time = " << std::chrono::duration<double>(sort - setup).count() << std::endl;

#pragma omp parallel
{
size_t thr_id = omp_get_thread_num();
size_t num_threads = omp_get_num_threads();

std::pair<size_t, size_t> partition = get_ith_partition(edges.size(), thr_id, num_threads);
size_t start = partition.first;
size_t end = partition.second;

// check if we collide with previous thread. If so lock and apply those updates.
if (start > 0 && edges[start].src == edges[start - 1].src) {
sketches[edges[start].src]->mutex.lock();
size_t orig_start = start;
while (edges[start].src == edges[orig_start].src) {
Edge edge = edges[start];
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
++start;
}

sketches[edges[orig_start].src]->mutex.unlock();
}

// check if we collide with next thread. If so lock and apply those updates.
if (end < edges.size() && edges[end - 1].src == edges[end].src) {
sketches[edges[end - 1].src]->mutex.lock();
size_t orig_end = end;
while (edges[end - 1].src == edges[orig_end - 1].src) {
Edge edge = edges[end - 1];
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
--end;
}

sketches[edges[orig_end].src]->mutex.unlock();
}

for (size_t i = start; i < end; i++) {
Edge edge = edges[i];
sketches[edge.src]->update(static_cast<vec_t>(concat_pairing_fn(edge.src, edge.dst)));
}
}
auto del = std::chrono::steady_clock::now();
std::cout << "Delete time = " << std::chrono::duration<double>(del - sort).count() << std::endl;

delete_time += std::chrono::steady_clock::now() - start;
}

std::vector<SpanningForest> CCSketchAlg::calc_disjoint_spanning_forests(size_t k) {
std::vector<SpanningForest> SFs;
std::chrono::steady_clock::time_point start;

for (size_t i = 0; i < k; i++) {
start = std::chrono::steady_clock::now();
SpanningForest sf = calc_spanning_forest();

SFs.push_back(sf);
query_time += std::chrono::steady_clock::now() - start;

for (auto edge : sf.get_edges()) {
update({edge, DELETE}); // deletes the found edge
}
filter_sf_edges(sf);
}

// revert the state of the sketches to remove all deletions
for (auto &sf : SFs) {
for (auto edge : sf.get_edges()) {
update({edge, INSERT}); // reinserts the deleted edge
}
filter_sf_edges(sf);
}

std::cout << "Number of SFs: " << SFs.size() << std::endl;

#ifdef VERIFY_SAMPLES_F
verifier->verify_spanning_forests(SFs);
#endif
Expand Down
62 changes: 18 additions & 44 deletions tools/spanning_forest_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ int main(int argc, char **argv) {
size_t empty = 0;
std::chrono::duration<double> ingest_time(0);
std::chrono::duration<double> query_time(0);
std::chrono::duration<double> filter_time(0);
std::chrono::duration<double> sample_time(0);
std::chrono::duration<double> delete_time(0);

for (size_t trial = 0; trial < trials; trial++) {
BinaryFileStream stream(stream_file);
Expand All @@ -79,65 +80,37 @@ int main(int argc, char **argv) {
std::cout << "Beginning stream ingestion ... "; fflush(stdout);
auto start = std::chrono::steady_clock::now();
driver.process_stream_until(END_OF_STREAM);
driver.prep_query(KSPANNINGFORESTS);
std::cout << "Stream processed!" << std::endl;
ingest_time += std::chrono::steady_clock::now() - start;
size_t max_rounds_used = 0;
std::cout << "Ingestion throughput: " << num_updates / std::chrono::duration<double>(std::chrono::steady_clock::now() - start).count() << std::endl;

// figure out how many rounds are required to extract log V spanning forests
for (size_t s = 0; s < vertex_power; s++) {
auto start = std::chrono::steady_clock::now();
try {
driver.prep_query(KSPANNINGFORESTS);
SpanningForest forest = cc_alg.calc_spanning_forest();
std::cout << "sf: " << s + 1 << ", rounds: " << cc_alg.last_query_rounds
<< "/" << cc_alg.max_rounds() << " \r";
fflush(stdout);
max_rounds_used = std::max(max_rounds_used, cc_alg.last_query_rounds);

query_time += std::chrono::steady_clock::now() - start;

if (forest.get_edges().size() == 0) {
std::cout << std::endl << "Exiting because of empty Spanning Forest " << s << std::endl;
empty += 1;
break;
}
start = std::chrono::steady_clock::now();
const auto &sf_edges = forest.get_edges();

// filter out all the found edges from the sketches
// This is technically illegal behavior. Which is like the point of this test :)
for (auto edge : sf_edges) {
cc_alg.update({edge, DELETE});
}
filter_time += std::chrono::steady_clock::now() - start;
} catch (OutOfSamplesException &err) {
std::cout << std::endl << "Got OutOfSamplesException on spanning forest " << s + 1 << std::endl;
errors += 1;
break;
} catch (...) {
std::cout << std::endl << "Got unknown exception on spanning forest " << s + 1 << std::endl;
errors += 1;
break;
}
}
start = std::chrono::steady_clock::now();
cc_alg.calc_disjoint_spanning_forests(vertex_power);
query_time += std::chrono::steady_clock::now() - start;

// add number of rounds to get log V spanning forests to vector
rounds_required[max_rounds_used] += 1;
rounds_required[cc_alg.last_query_rounds] += 1;

sample_time += cc_alg.query_time;
delete_time += cc_alg.delete_time;
}

std::cout << std::endl;
std::cout << "ERRORS = " << errors << std::endl;
std::cout << "EMPTY = " << empty << std::endl;

for (size_t i = 0; i < rounds_required.size(); i++) {
std::cout << i << ", " << rounds_required[i] << std::endl;
}
// for (size_t i = 0; i < rounds_required.size(); i++) {
// std::cout << i << ", " << rounds_required[i] << std::endl;
// }

auto stats = calc_stats(rounds_required);
std::cout << "avg = " << stats.first << " std dev = " << stats.second << std::endl;
std::cout << "ingest: " << ingest_time.count() << std::endl;
std::cout << "query: " << query_time.count() << std::endl;
std::cout << "filter: " << filter_time.count() << std::endl;
std::cout << " sample: " << sample_time.count() << std::endl;
std::cout << " delete: " << delete_time.count() << std::endl;

std::ofstream output_file("rounds_required.txt");
output_file << "ERRORS = " << errors << std::endl;
Expand All @@ -149,5 +122,6 @@ int main(int argc, char **argv) {
output_file << "std dev, " << stats.second << std::endl;
output_file << "ingest: " << ingest_time.count() << std::endl;
output_file << "query: " << query_time.count() << std::endl;
output_file << "filter: " << filter_time.count() << std::endl;
output_file << " sample: " << sample_time.count() << std::endl;
output_file << " delete: " << delete_time.count() << std::endl;
}

0 comments on commit d6d71d1

Please sign in to comment.