Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Csv format #138

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ All tests support the same set of arguments :
* `-c,--check <0/1>` check correctness of results. This can be quite slow on large numbers of GPUs. Default : 1.
* `-z,--blocking <0/1>` Make NCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0.
* `-G,--cudagraph <num graph launches>` Capture iterations as a CUDA graph and then replay specified number of times. Default : 0.
* Saving results
* `-F,--output_file` Export results to CSV file on rank 0. Directory has to exist! Default: "" (Do not save to CSV file - print to stdout). It will overwrite file if it already exists.

## CSV integration with Pandas
CSV files generated with the `-F` option can be used directly in [pandas](https://pandas.pydata.org/) to analyse the results. Here is an example printing the time column:
```
import pandas as pd
df = pd.read_csv("data.csv", header=[0, 1, 2])
print(df[('out-of-place', ' time', '(us)')]) # NOTE: we are using hierarchical columns
```
lipovsek-aws marked this conversation as resolved.
Show resolved Hide resolved

## Copyright

Expand Down
140 changes: 126 additions & 14 deletions src/common.cu
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include <getopt.h>
#include <libgen.h>
#include "cuda.h"
#include <string>
#include <fstream>
#include <iostream>

#include "../verifiable/verifiable.h"

Expand Down Expand Up @@ -80,6 +83,117 @@ static int cudaGraphLaunches = 0;
static int report_cputime = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
static std::string resultsFile;

//TODO: implement metaInfo method to peint metadata (device info etc.)
Reporter::Reporter(std::string csvName_, const char* timeStr_, bool isMain_): csvName { csvName_ }, timeStr {timeStr_}, isMainThreadFlag { isMain_ } {
if (!isMainThread()){
return;
}
saveToCSV = csvName != "";

char columnsMeta[1000];
char columnsName[1000];

if (saveToCSV){
printf("#\n# Writing results to file: %s\n#\n", csvName.c_str());
outf = std::ofstream { csvName };

const char csvFormatMeta[1000] { ",,,,,out-of-place,out-of-place,out-of-place,out-of-place,in-place,in-place,in-place,in-place,,,,,,,,,,,,,,,,,,,\n" };
snprintf( columnsMeta, 1000, csvFormatMeta);
outf << columnsMeta;

const char csvFormatName[1000] {"size, count, type, redop, root, %s, algbw, busbw, #wrong, %s, algbw, busbw, #wrong, nthreads, ngpus, minbytes, maxbytes, stepbytes, stepfactor, check, warmup_iters, iters, agg_iters, op, datatype, root, parallel_init, blocking, stream_null, timeout, cudagraph, report_cputime\n"};
snprintf( columnsName, 1000, csvFormatName, timeStr, timeStr );
outf << columnsName;

outf << "(B), (elements),,,,(us),(GB/s),(GB/s),,(us),(GB/s),(GB/s),,,,,,,,,,,,,,,,,,,,\n";
outf.flush();
} else {
const char stdoutFormatMeta[1000] { "# %10s %12s %8s %6s %6s out-of-place in-place " };
snprintf( columnsMeta, 1000, stdoutFormatMeta, "", "", "", "", "" );
std::cout << columnsMeta << std::endl;

const char stdoutFormatName[1000] { "# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s" };
snprintf( columnsName, 1000, stdoutFormatName, "size", "count", "type", "redop", "root", timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong" );
std::cout << columnsName << std::endl;

char columnsUnits[1000];
const char stdoutFormat[1000] { "# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s" };
snprintf( columnsUnits, 1000, stdoutFormat, "(B)", "(elements)", "", "", "", "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "" );
std::cout << columnsUnits << std::endl;
}
}

void Reporter::parameters(long bytes, long elements, const char* typeName, const char* opName, int rootName){
if (!isMainThread()){
return;
}

char parameters[1000];

if (saveToCSV){
const char csvFormat[1000] { "%li, %li, %s, %s, %i" };
snprintf( parameters, 1000, csvFormat, bytes, elements, typeName, opName, rootName );
outf << parameters;
outf.flush();
} else {
const char stdoutFormat[1000] { "%12li %12li %8s %6s %6i" };
snprintf( parameters, 1000, stdoutFormat, bytes, elements, typeName, opName, rootName );
std::cout << parameters;
}
}

void Reporter::result(const char* time, float algBw, float busBw, long wrongElts){
if (!isMainThread()){
return;
}

char result[1000];

if (saveToCSV){
const char csvFormat[1000] { ", %s, %f, %f, %li" };
snprintf( result, 1000, csvFormat, time, algBw, busBw, wrongElts);
outf << result;
outf.flush();
} else {
const char stdoutFormat[1000] { " %7s %6.2f %6.2f %5li" };
snprintf( result, 1000, stdoutFormat, time, algBw, busBw, wrongElts);
std::cout << result;
}
}

void Reporter::newStep(){
if (!isMainThread()){
return;
}

if (saveToCSV){
const char csvFormat [1000] {",%i,%i,%lu,%lu,%lu,%lu,%i,%i,%i,%i,%s,%s,%i,%i,%i,%i,%i,%i,%i\n"};
char result[1000];
snprintf( result, 1000, csvFormat, nThreads, nGpus, minBytes, maxBytes, stepBytes, stepFactor, datacheck, warmup_iters, iters, agg_iters, test_opnames[ncclop], test_typenames[nccltype], ncclroot, parallel_init, blocking_coll, streamnull, timeout, cudaGraphLaunches, report_cputime);
outf << result;
outf.flush();
} else {
std::cout << "\n";
}
}

bool Reporter::isMainThread(){
return isMainThreadFlag;
}

Reporter::~Reporter(){
sjeaugey marked this conversation as resolved.
Show resolved Hide resolved
if (!isMainThread()){
return;
}
if (saveToCSV){
outf << "\n";
outf.close();
}
}



#define NUM_BLOCKS 32

Expand Down Expand Up @@ -550,9 +664,9 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
sprintf(timeStr, "%7.2f", timeUsec);
}
if (args->reportErrors) {
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
args->reporter->result(timeStr, algBw, busBw, (double)wrongElts);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
args->reporter->result(timeStr, algBw, busBw, double {-1});
}

args->bw[0] += busBw;
Expand Down Expand Up @@ -595,12 +709,10 @@ testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char*
// Benchmark
for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
setupArgs(size, type, args);
char rootName[100];
sprintf(rootName, "%6i", root);
PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
args->reporter->parameters(max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, root);
TESTCHECK(BenchTime(args, type, op, root, 0));
TESTCHECK(BenchTime(args, type, op, root, 1));
PRINT("\n");
args->reporter->newStep();
}
return testSuccess;
}
Expand Down Expand Up @@ -705,13 +817,14 @@ int main(int argc, char* argv[]) {
{"cudagraph", required_argument, 0, 'G'},
{"report_cputime", required_argument, 0, 'C'},
{"average", required_argument, 0, 'a'},
{"output_file", required_argument, 0, 'F'},
{"help", no_argument, 0, 'h'},
{}
};

while(1) {
int c;
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:y:T:hG:C:a:", longopts, &longindex);
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:y:T:hG:C:a:F:", longopts, &longindex);

if (c == -1)
break;
Expand Down Expand Up @@ -795,6 +908,9 @@ int main(int argc, char* argv[]) {
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
case 'F':
resultsFile = optarg;
break;
case 'h':
default:
if (c != 'h') printf("invalid option '%c'\n", c);
Expand Down Expand Up @@ -825,6 +941,7 @@ int main(int argc, char* argv[]) {
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-F,--output_file <CSV file path> Export results to CSV file on rank 0. Directory has to exist! Default: "" (Do not save to CSV file - print to stdout). It will overwrite file if it already exists. ] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
Expand Down Expand Up @@ -894,7 +1011,6 @@ testResult_t run() {
rank, color, getpid(), hostname, cudaDev, prop.pciBusID, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}

#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
Expand Down Expand Up @@ -972,12 +1088,7 @@ testResult_t run() {
fflush(stdout);

const char* timeStr = report_cputime ? "cputime" : "time";
PRINT("#\n");
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
Reporter reporter {resultsFile, timeStr, is_main_thread==1};

struct testThread threads[nThreads];
memset(threads, 0, sizeof(struct testThread)*nThreads);
Expand Down Expand Up @@ -1009,6 +1120,7 @@ testResult_t run() {

threads[t].args.reportErrors = datacheck;

threads[t].args.reporter = &reporter;
threads[t].func = parallel_init ? threadInit : threadRunTests;
if (t)
TESTCHECK(threadLaunch(threads+t));
Expand Down
21 changes: 21 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <pthread.h>
#include "nccl1_compat.h"
#include "timer.h"
#include <string>
#include <fstream>
#include <iostream>

// For nccl.h < 2.13 since we define a weak fallback
extern "C" char const* ncclGetLastError(ncclComm_t comm);
Expand Down Expand Up @@ -101,6 +104,22 @@ extern struct testColl broadcastTest;
extern struct testColl reduceTest;
extern struct testColl alltoAllTest;

class Reporter {
private:
std::ofstream outf;
std::string csvName;
bool saveToCSV;
const char* timeStr;
bool isMainThreadFlag;
public:
Reporter(std::string csvName_, const char* timeStr_, bool isMain);
void parameters(long bytes, long elements, const char* typeName, const char* opName, int rootName);
void result(const char* time, float algBw, float busBw, long wrongElts);
void newStep();
bool isMainThread();
~Reporter();
};

struct testEngine {
void (*getBuffSize)(size_t *sendcount, size_t *recvcount, size_t count, int nranks);
testResult_t (*runTest)(struct threadArgs* args, int root, ncclDataType_t type,
Expand Down Expand Up @@ -142,6 +161,8 @@ struct threadArgs {
int reportErrors;

struct testColl* collTest;

Reporter* reporter;
};

typedef testResult_t (*threadFunc_t)(struct threadArgs* args);
Expand Down