Skip to content

Commit

Permalink
fix rank issue, add metadata support and debug mode, add wrf, openfoa…
Browse files Browse the repository at this point in the history
…m, lammps jarvis packages, clean the code and add cmake flags (#45)

* update the change from master

* remove globavariable.cpp, remove class default

* remove globalvaribale.cpp, fix the class construction

* change the output format, temporary remove the global variable

* change the spdlog and create two spdlog for put and get

* add macro in cmakelist and ifdef method to allow metadata collection

* add support for put and get count

* simpfy the logger code for globalvariable and metadata

* change the formate of spdlog output

* add header for metadata result file

* update the change from master

* remove globavariable.cpp, remove class default

* remove globalvaribale.cpp, fix the class construction

* change the output format, temporary remove the global variable

* change the spdlog and create two spdlog for put and get

* add macro in cmakelist and ifdef method to allow metadata collection

* add support for put and get count

* simpfy the logger code for globalvariable and metadata

* change the formate of spdlog output

* add header for metadata result file

* remove test/src folder to /test, add POSIX I/O support

* add flag for each function

* fix bug when use latest hermes

* fix the error caused by hermes upgrade

* add get put syn support

* add trace_func to each function for coeus adapter

* remove the destructor of Tracer and add std cout method

* add spdlog flush and rank support log

* add more function to trace

* debug process: comment out the endstep, database operation

* change the rank to getpid()

* add cout for doputdeferred method

* change format for cout

* add log file

* comment out metadata part

* comment out delete db

* SHM rank consensus added

* Changes to the CMake for the rank consensus

* Added the new task to the search path for includes of the engine

* Return the value through the task class

* minor change to the initialization of the atomic

* changes to return the ranl properly and delete the task

* changes to return the ranl properly and delete the task

* add rank with consensus

* fix error with double declear variable

* move hermes setup location

* Registering the new task on hermes

* print debug on the task

* more prints

* more prints

* small fix to rank print

* changed task state name

* changed task state name

* printing task id

* printing task id

* changing the init of the task id

adding changes to mdm and mdm constructor prints a message

* decommenting the metadata

* add code for metadata database

* change the db_creation and detach database create and set

* add endstep database support

* remove throw db_file not found

* Debug prints on the mdm

* Debug prints on the mdm

* Gettter for db name

* Gettter for db name

* more prints

* more prints

* More prints and use of Local

* Put is now synchronous

* put back to async

* More debug pritnitng

* more print debugs

* Gettter for db name

* movign to namespace

* adding deserialization

* uncomment db operation in putsync

* uncommented all database operation in put

* add wrf lammps jarvis package

* add more content to lammps and wrf

* Create README.md

* Update README.md

* Update README.md

* Update README.md

* Create README.md

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* fix wrf and lammps jarvis package

* Update README.md

* add post_wrf support for jarvis

* Update README.md

* add post_wrf for support

* add openfoam jarvis

* Create README.md

* Update README.md

* remove test real_app, add new tracer.h, remove trace function in bucket

* Update README.md

* clean the code and remove debug cout

* clean the code and remove debug cout

* add variable name and cout in TRACE_FUNC()

* add information in TRACE_FUNC

* Update README.md

* remove log file

* block the io_comp because of adios2 undefined function error

* add debug mode for coeus adapter, user can choose turn on debug mode in cmakelists

* add debug mode with cmakelists ifdef for compiling in hermes_engine.c and Bucket.h

* Update README.md

* add ifdef in Tracer.h file

* clean the TRACE_FUNC and remove cout

* fix typo error

* change the readme.MD and comment metadata and tracer

* remove cout

* fix typo error

* remove cout in sqlite.h

* set global compile flag in CMakelists.txt for metadata and tracer

* remove cout and comment, add readme how to use metadata collector and tracer

* remove cout and comment

* remmove comment and cout

---------

Co-authored-by: hxu65 <hxu40@hawk.iitedu>
Co-authored-by: unknown <wardhsu7@gmail.com>
Co-authored-by: jaime <jcernudagarcia@hawk.iit.edu>
  • Loading branch information
4 people authored Feb 5, 2024
1 parent 7c0207f commit b56771e
Show file tree
Hide file tree
Showing 149 changed files with 2,269 additions and 70 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ endif()
# Build COEUS
#------------------------------------------------------------------------------
add_subdirectory(tasks)
option(meta_enabled "Enable metadata features" OFF)
option(debug_mode "Enable debug mode" OFF)
if(meta_enabled)
add_compile_definitions(Meta_enabled)
endif ()
if(debug_mode)
add_compile_definitions(debug_mode)
endif ()
add_subdirectory(src)
#add_custom_target(lint ALL COMMAND bash ${COEUS_ROOT}/CI/lint.sh ${COEUS_ROOT})

Expand Down
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ of the ADIOS plugins interface.
## Install

To compile


```
git clone https://github.com/lukemartinlogan/coeus-adapter.git
spack load hermes@master
spack load adios2
cd coeus-adapter
mkdir build
cd build
cmake ../
make -j8
```

Note:
To enable metadata and function trace feature, please add flag during cmake
```
cmake .. -Dmeta_enabled=ON -Ddebug_mode=ON
```
## Test

To test the functionality of the adapter, run:
```
ctest
```
```
18 changes: 11 additions & 7 deletions include/coeus/HermesEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <adios2.h>
#include <adios2/engine/plugin/PluginEngineInterface.h>
#include "ContainerManager.h"

#include "rankConsensus/rankConsensus.h"
#include "coeus/MetadataSerializer.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"
Expand All @@ -34,11 +34,12 @@
#include <common/ErrorCodes.h>
#include "common/DbOperation.h"
#include "coeus_mdm/coeus_mdm.h"

#include "common/VariableMetadata.h"
#include <comms/Bucket.h>
#include <comms/Hermes.h>
#include <comms/MPI.h>

#include "common/globalVariable.h"
#include "common/Tracer.h"

namespace coeus {
class HermesEngine : public adios2::plugin::PluginEngineInterface {
Expand All @@ -48,9 +49,11 @@ class HermesEngine : public adios2::plugin::PluginEngineInterface {
SQLiteWrapper* db;
std::string db_file;
hrun::coeus_mdm::Client client;
hrun::rankConsensus::Client rank_consensus;
// FileLock* lock;
// DbQueueWorker* db_worker;
int ppn;
GlobalVariable globalData;
/** Construct the HermesEngine */
HermesEngine(adios2::core::IO &io, //NOLINT
const std::string &name,
Expand Down Expand Up @@ -102,7 +105,7 @@ class HermesEngine : public adios2::plugin::PluginEngineInterface {
int total_steps = -1;

// std::shared_ptr<coeus::MPI> mpiComm;
int rank;
uint rank;
int comm_size;

YAMLMap variableMap;
Expand All @@ -111,7 +114,8 @@ class HermesEngine : public adios2::plugin::PluginEngineInterface {
std::vector<std::string> listOfVars;

std::shared_ptr<spdlog::logger> engine_logger;

std::shared_ptr<spdlog::logger> meta_logger_put;
std::shared_ptr<spdlog::logger> meta_logger_get;
void IncrementCurrentStep();

template<typename T>
Expand All @@ -137,7 +141,7 @@ class HermesEngine : public adios2::plugin::PluginEngineInterface {
/** Place data in Hermes */
template<typename T>
void DoPutSync_(const adios2::core::Variable<T> &variable,
const T *values) {engine_logger->info("rank {}", rank);}
const T *values);

/** Place data in Hermes asynchronously */
template<typename T>
Expand All @@ -147,7 +151,7 @@ class HermesEngine : public adios2::plugin::PluginEngineInterface {
/** Get data from Hermes (sync) */
template<typename T>
void DoGetSync_(const adios2::core::Variable<T> &variable,
T *values) {engine_logger->info("rank {}", rank);}
T *values);

/** Get data from Hermes (async) */
template<typename T>
Expand Down
11 changes: 10 additions & 1 deletion include/common/SQlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <iostream>
#include <sqlite3.h>
#include <string>
#include <unistd.h>
#include <vector>
#include <utility>
#include <type_traits>
Expand All @@ -23,13 +24,15 @@ class SQLiteWrapper {
bool deleteOnDestruction;

bool execute(const std::string& sql, void* data = nullptr, int (*callbackFunc)(void*, int, char**, char**) = nullptr) {

char* errMsg = 0;
int rc = sqlite3_exec(db, sql.c_str(), callbackFunc, data, &errMsg);
if (rc != SQLITE_OK) {
std::cerr << "SQL error: " << errMsg << std::endl;
std::cerr << "MDM: SQL error: " << errMsg << std::endl;
sqlite3_free(errMsg);
return false;
}

return true;
}

Expand All @@ -38,6 +41,9 @@ class SQLiteWrapper {
}

public:
std::string getName(){
return dbName;
}
SQLiteWrapper(const std::string& dbName, bool deleteOnDestruction = false)
: dbName(dbName), deleteOnDestruction(deleteOnDestruction) {
if (sqlite3_open(dbName.c_str(), &db)) {
Expand Down Expand Up @@ -84,6 +90,7 @@ class SQLiteWrapper {
sqlite3_bind_int(stmt, 2, step);
sqlite3_step(stmt);
sqlite3_finalize(stmt);

}

int GetTotalSteps(const std::string& appName) {
Expand Down Expand Up @@ -136,6 +143,7 @@ class SQLiteWrapper {
sqlite3_bind_text(stmt, 5, blobInfo.blob_name.c_str(), -1, SQLITE_STATIC);
sqlite3_step(stmt);
sqlite3_finalize(stmt);

}

BlobInfo GetBlobLocation(int step, int mpi_rank, const std::string& name) {
Expand Down Expand Up @@ -173,6 +181,7 @@ class SQLiteWrapper {
}

void InsertVariableMetadata(int step, int mpi_rank, const VariableMetadata& metadata) {

sqlite3_stmt* stmt;
const std::string insertOrUpdateSQL = "INSERT INTO VariableMetadataTable (step, mpi_rank, name, shape, start, count, constantShape, dataType) VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
auto shape = VariableMetadata::serializeVector(metadata.shape);
Expand Down
185 changes: 185 additions & 0 deletions include/common/Tracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//
// Created by jaime on 1/10/2024.
//

#ifndef DISTRIBUTEDTRACER_TRACER_H
#define DISTRIBUTEDTRACER_TRACER_H

#include <spdlog/spdlog.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <thread>
#include <string>
#include <sstream>
#include <unistd.h>

#ifdef __GNUG__
#include <cstdlib>
#include <memory>
#include <cxxabi.h>

std::string demangle(const char* name) {
int status = -1;
std::unique_ptr<char, void(*)(void*)> res{
abi::__cxa_demangle(name, NULL, NULL, &status),
std::free
};
return (status == 0) ? res.get() : name;
}
#else
std::string demangle(const char* name) {
return name;
}
#endif

#ifdef debug_mode
#define TRACE_FUNC(...) TraceLogger traceLogger(__func__, ##__VA_ARGS__)
#else
#define TRACE_FUNC(...)
#endif

template<typename T>
class EasySingleton {
protected:
/** static instance. */
static T* obj_;
static std::mutex lock_;

public:
/**
* Uses unique pointer to build a static global instance of variable.
* @tparam T
* @return instance of T
*/
template<typename ...Args>
static T* GetInstance(Args&& ...args) {
if (obj_ == nullptr) {
std::scoped_lock lock(lock_);
if (obj_ == nullptr) {
obj_ = new T(std::forward<Args>(args)...);
}
}
return obj_;
}
};
template <typename T>
T* EasySingleton<T>::obj_ = nullptr;
template <typename T>
std::mutex EasySingleton<T>::lock_ = std::mutex();

class TraceManager {
public:
explicit TraceManager(int rank) {
init_(rank);
}

explicit TraceManager() {
init_(getpid());
}

~TraceManager() {
auto log = spdlog::get("trace_logger");
log->set_pattern(exitPattern);
log->info("");
spdlog::drop("trace_logger");
}

private:
std::string entryPattern = {"["};
std::string exitPattern = {"]"};
std::string jsonPattern = {"{%v},"};

void init_(int id){
std::string fileName = "trace_" + std::to_string(id) + ".json";
spdlog::basic_logger_mt("trace_logger", fileName);

auto log = spdlog::get("trace_logger");
log->set_pattern(entryPattern);
log->info("");
log->set_pattern(jsonPattern);
}
};



class TraceLogger {
public:
template<typename... Args>
explicit TraceLogger(std::string functionName, Args &&... args)
: functionName_(std::move(functionName)) {
auto logClient = EasySingleton<TraceManager>::GetInstance();
logEvent("B", std::forward<Args>(args)...); // "B" for Begin
}

~TraceLogger() {
logEvent("E"); // "E" for End
}

private:
std::string functionName_;

static auto getTime(){
auto now = std::chrono::system_clock::now();
auto duration_since_epoch = now.time_since_epoch();
return std::chrono::duration_cast<std::chrono::microseconds>(duration_since_epoch).count();
}

static auto getTID(){
std::stringstream ss;
ss << std::this_thread::get_id();
return ss.str();
}

template<typename T, typename = void>
struct is_streamable : std::false_type {};

template<typename T>
struct is_streamable<T, std::void_t<decltype(std::declval<std::ostream&>() << std::declval<T>())>> : std::true_type {};

template<typename T>
std::string stringifyArg(const T& arg) {
if constexpr (is_streamable<T>::value) {
std::ostringstream ss;
ss << arg;
return ss.str();
} else {
return "[non-streamable type]";
}
}

template<typename... Args>
std::string logArguments(Args&&... args) {
std::ostringstream ss;
ss << "{\n";
int unpack[] = {0, (ss << " \"" << demangle(typeid(Args).name()) << "\": " << stringifyArg(args) << ",\n", 0)...};
static_cast<void>(unpack); // To avoid unused variable warning
std::string result = ss.str();
if (sizeof...(Args) > 0) {
// Remove the last comma
result.erase(result.rfind(','), 1);
}
result += "}";
return result;
}

template<typename... Args>
void logEvent(const std::string &phase, Args &&... args) {
auto microseconds = getTime();
auto tid = getTID();

std::string logEntry = "\"name\": \"" + functionName_
+ "\", \"ph\": \"" + phase
+ "\", \"ts\": " + std::to_string(microseconds)
+ ", \"pid\": " + std::to_string(getpid())
+ ", \"tid\": " + tid;

if constexpr (sizeof...(args) != 0) {
std::string argsJson = logArguments(std::forward<Args>(args)...);
logEntry += ", \"args\":" + argsJson;
}

auto log = spdlog::get("trace_logger");
log->info(logEntry);
}
};

#endif //DISTRIBUTEDTRACER_TRACER_H
Loading

0 comments on commit b56771e

Please sign in to comment.