Skip to content

Commit

Permalink
hermes_posix.so compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemartinlogan committed Dec 22, 2024
1 parent 025eb5d commit fedf95b
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 347 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ set(Hermes_RUNTIME_DEPS hermes_runtime)

set(TEST_MAIN ${CMAKE_SOURCE_DIR}/test/unit)
add_subdirectory(src)
# add_subdirectory(hermes_adapters)
add_subdirectory(hermes_adapters)
add_subdirectory(tasks)
# add_subdirectory(benchmark)
# add_subdirectory(wrapper)
Expand Down
301 changes: 0 additions & 301 deletions benchmark/test_latency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,307 +13,6 @@
#include "hermes_shm/util/timer.h"
#include "small_message/small_message.h"

/** The performance of getting a queue */
TEST_CASE("TestGetQueue") {
/*chi::QueueId qid(0, 3);
CHI_ADMIN->CreateQueue(chi::DomainId::GetLocal(), qid,
16, 16, 256,
hshm::bitfield32_t(0));
CHI_CLIENT->GetQueue(qid);
hshm::Timer t;
t.Resume();
size_t ops = (1 << 20);
for (size_t i = 0; i < ops; ++i) {
chi::MultiQueue *queue = CHI_CLIENT->GetQueue(qid);
REQUIRE(queue->id_ == qid);
}
t.Pause();
HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());*/
}

/** Single-thread performance of allocating + freeing tasks */
TEST_CASE("TestHshmAllocateFree") {
hshm::Timer t;
t.Resume();
size_t ops = (1 << 20);
size_t count = (1 << 8);
size_t reps = ops / count;
for (size_t i = 0; i < reps; ++i) {
std::vector<chi::Task *> tasks(count);
for (size_t j = 0; j < count; ++j) {
tasks[j] = CHI_CLIENT->NewTask<chi::Task>().ptr_;
}
for (size_t j = 0; j < count; ++j) {
CHI_CLIENT->DelTask(HSHM_DEFAULT_MEM_CTX, tasks[j]);
}
}
t.Pause();
HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Single-thread performance of emplacing, and popping a mpsc_ptr_queue */
TEST_CASE("TestPointerQueueEmplacePop") {
size_t ops = (1 << 20);
auto queue_ptr = hipc::make_uptr<hipc::mpsc_ptr_queue<hipc::Pointer>>(ops);
auto queue = queue_ptr.get();
hipc::Pointer p;

hshm::Timer t;
t.Resume();
for (size_t i = 0; i < ops; ++i) {
queue->emplace(p);
queue->pop(p);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Single-thread performance of empacling + popping vec<mpsc_ptr_queue> */
TEST_CASE("TestPointerQueueVecEmplacePop") {
auto queues_ptr =
hipc::make_uptr<hipc::vector<hipc::mpsc_ptr_queue<hipc::Pointer>>>(16);
auto queues = queues_ptr.get();
hipc::Pointer p;

hshm::Timer t;
size_t ops = (1 << 20);
for (size_t i = 0; i < ops; ++i) {
t.Resume();
auto &queue = (*queues)[0];
queue.emplace(p);
queue.pop(p);
t.Pause();
}

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Single-thread performance of getting, emplacing, and popping a queue */
TEST_CASE("TestHshmQueueEmplacePop") {
chi::QueueId qid(0, 3);
u32 ops = (1 << 20);
std::vector<PriorityInfo> queue_info = {
{TaskPrioOpt::kAdmin, 16, 16, ops, 0}};
auto queue = hipc::make_uptr<chi::MultiQueue>(qid, queue_info);
chi::LaneData entry;
auto task = CHI_CLIENT->NewTask<chi::Task>();
entry.p_ = task.shm_;

hshm::Timer t;
t.Resume();
chi::Lane &lane = queue->GetLane(0, 0);
for (size_t i = 0; i < ops; ++i) {
queue->Emplace(0, 0, entry);
lane.pop();
}
t.Pause();

CHI_CLIENT->DelTask(HSHM_DEFAULT_MEM_CTX, task);
HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Single-thread performance of getting a lane from a queue */
TEST_CASE("TestHshmQueueGetLane") {
chi::QueueId qid(0, 3);
std::vector<PriorityInfo> queue_info = {
{TaskPrioOpt::kAdmin, 16, 16, 256, 0}};
auto queue = hipc::make_uptr<chi::MultiQueue>(qid, queue_info);
chi::LaneGroup group = queue->GetGroup(0);

hshm::Timer t;
size_t ops = (1 << 20);
t.Resume();
for (size_t i = 0; i < ops; ++i) {
queue->GetLane(0, i % group.num_lanes_);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Single-thread performance of getting, emplacing, and popping a queue */
TEST_CASE("TestHshmQueueAllocateEmplacePop") {
TRANSPARENT_HERMES();
chi::QueueId qid(0, 3);
std::vector<PriorityInfo> queue_info = {
{TaskPrioOpt::kAdmin, 16, 16, 256, 0}};
auto queue = hipc::make_uptr<chi::MultiQueue>(qid, queue_info);
chi::Lane &lane = queue->GetLane(0, 0);

hshm::Timer t;
size_t ops = (1 << 20);
t.Resume();
for (size_t i = 0; i < ops; ++i) {
chi::LaneData entry;
auto task = CHI_CLIENT->NewTask<chi::Task>();
entry.p_ = task.shm_;
queue->Emplace(0, 0, entry);
lane.pop();
CHI_CLIENT->DelTask(HSHM_DEFAULT_MEM_CTX, task);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Time to spawn and join a thread */
TEST_CASE("TestSpawnJoinThread") {
hshm::Timer t;
t.Resume();
size_t count = 16 * (1 << 10);
for (int i = 0; i < count; ++i) {
std::thread thread([]() {});
thread.join();
}
t.Pause();
HILOG(kInfo, "Latency: {} MOps", count / t.GetUsec());
}

/** Time to spawn and join a thread */
TEST_CASE("TestSpawnJoinArgoThread") {
hshm::Timer t;
t.Resume();
ABT_xstream xstream;
ABT_thread tl_thread_;
size_t count = 16 * (1 << 10);
ABT_init(0, nullptr);
int ret = ABT_xstream_create(ABT_SCHED_NULL, &xstream);
if (ret != ABT_SUCCESS) {
HELOG(kFatal, "Could not create argobots xstream");
}
for (int i = 0; i < count; ++i) {
int ret = ABT_thread_create_on_xstream(
xstream, [](void *args) {}, nullptr, ABT_THREAD_ATTR_NULL, &tl_thread_);
if (ret != ABT_SUCCESS) {
HELOG(kFatal, "Couldn't spawn worker");
}
ABT_thread_join(tl_thread_);
}
t.Pause();
HILOG(kInfo, "Latency: {} MOps", count / t.GetUsec());
}

void TestWorkerIterationLatency(u32 num_queues, u32 num_lanes) {
HRUN_RUNTIME->Create();

chi::Worker worker(0);
std::vector<hipc::uptr<chi::MultiQueue>> queues;
for (u32 i = 0; i < num_queues; ++i) {
chi::QueueId qid(0, i + 1);
std::vector<PriorityInfo> queue_info = {
{TaskPrioOpt::kAdmin, num_lanes, num_lanes, 256, 0}};
auto queue = hipc::make_uptr<chi::MultiQueue>(qid, queue_info);
queues.emplace_back(std::move(queue));
for (u32 j = 0; j < num_lanes; ++j) {
worker.PollQueues({{0, j, queue.get()}});
}
}

chi::small_message::Client client;
CHI_ADMIN->RegisterModule(chi::DomainId::GetLocal(), "small_message");
client.Create(chi::DomainId::GetLocal(), "ipc_test");

hshm::Timer t;
t.Resume();
// size_t ops = (1 << 20);
size_t ops = 256;
for (size_t i = 0; i < ops; ++i) {
hipc::FullPtr<chi::small_message::MdPushTask> task;
chi::TaskNode task_node(chi::TaskId((u32)0, (u64)i));
task = client.AsyncMdPushEmplace(queues[num_queues - 1].get(), task_node,
chi::DomainId::GetLocal());
worker.Run(false);
CHI_CLIENT->DelTask(HSHM_DEFAULT_MEM_CTX, task);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Time for worker to process a request */
TEST_CASE("TestWorkerLatency") {
CHIMAERA_CLIENT_INIT();
TestWorkerIterationLatency(1, 16);
TestWorkerIterationLatency(5, 16);
TestWorkerIterationLatency(10, 16);
TestWorkerIterationLatency(15, 16);
}

/** Time to process a request */
TEST_CASE("TestRoundTripLatency") {
TRANSPARENT_HERMES();
chi::small_message::Client client;
CHI_ADMIN->RegisterModule(chi::DomainId::GetLocal(), "small_message");
// int count = 25;
// for (int i = 0; i < count; ++i) {
// chi::small_message::Client client2;
// client2.Create(chi::DomainId::GetLocal(), "ipc_test" +
// std::to_string(i));
// }
client.Create(chi::DomainId::GetLocal(), "ipc_test");
hshm::Timer t;

// int pid = getpid();
// ProcessAffiner::SetCpuAffinity(pid, 8);

t.Resume();
size_t ops = (1 << 20);
// size_t ops = 1024;
for (size_t i = 0; i < ops; ++i) {
// client.Md(chi::DomainId::GetLocal());
client.MdPush(chi::DomainId::GetLocal());
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

TEST_CASE("TestTimespecLatency") {
size_t ops = (1 << 20);
hshm::Timer t;

t.Resume();
for (size_t i = 0; i < ops; ++i) {
struct timespec ts;
timespec_get(&ts, TIME_UTC);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

TEST_CASE("TestTimerLatency") {
size_t ops = (1 << 20);
hshm::Timer t, tmp;

t.Resume();
double usec;
for (size_t i = 0; i < ops; ++i) {
usec = tmp.GetUsecFromStart();
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps (usec={})", ops / t.GetUsec(), usec);
}

TEST_CASE("TestTimepointLatency") {
size_t ops = (1 << 20);
hshm::Timer t;
hshm::Timepoint tmp;

t.Resume();
double usec;
for (size_t i = 0; i < ops; ++i) {
usec = tmp.GetUsecFromStart();
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps (usec={})", ops / t.GetUsec(), usec);
}

/** Time to process a request */
// TEST_CASE("TestHermesGetBlobIdLatency") {
// HERMES->ClientInit();
Expand Down
18 changes: 9 additions & 9 deletions hermes_adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
#------------------------------------------------------------------------------
add_subdirectory(filesystem)
add_subdirectory(posix)
if (HERMES_ENABLE_STDIO_ADAPTER)
add_subdirectory(stdio)
endif()
if (HERMES_ENABLE_MPIIO_ADAPTER)
add_subdirectory(mpiio)
endif()
if (HERMES_ENABLE_VFD)
add_subdirectory(vfd)
endif()
# if (HERMES_ENABLE_STDIO_ADAPTER)
# add_subdirectory(stdio)
# endif()
# if (HERMES_ENABLE_MPIIO_ADAPTER)
# add_subdirectory(mpiio)
# endif()
# if (HERMES_ENABLE_VFD)
# add_subdirectory(vfd)
# endif()

#-----------------------------------------------------------------------------
# Install HRUN Admin Task Library Headers
Expand Down
4 changes: 2 additions & 2 deletions hermes_adapters/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ include_directories(
add_library(hermes_fs_base SHARED
${CMAKE_CURRENT_SOURCE_DIR}/filesystem_mdm_singleton.cc)
add_dependencies(hermes_fs_base
hermes)
hermes_client)
target_link_libraries(hermes_fs_base
MPI::MPI_CXX
hermes)
hermes_client)

#-----------------------------------------------------------------------------
# Add Target(s) to CMake Install
Expand Down
12 changes: 6 additions & 6 deletions hermes_adapters/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
#include "filesystem_io_client.h"
#include "filesystem_mdm.h"
#include "hermes/bucket.h"
#include "hermes/data_stager/binary_stager.h"
#include "hermes/hermes.h"
#include "hermes/staging/binary_stager.h"
#include "hermes_adapters/mapper/mapper_factory.h"

namespace hermes::adapter {
Expand Down Expand Up @@ -80,12 +80,12 @@ class Filesystem : public FilesystemIoClient {
HILOG(kDebug, "File not opened before by adapter");
// Normalize path strings
stat.path_ = stdfs::absolute(path).string();
auto path_shm = hipc::make_uptr<chi::string>(stat.path_);
chi::string path_shm(stat.path_);
// Verify the bucket exists if not in CREATE mode
if (stat.adapter_mode_ == AdapterMode::kScratch &&
!stat.hflags_.Any(HERMES_FS_EXISTS) &&
!stat.hflags_.Any(HERMES_FS_CREATE)) {
TagId bkt_id = HERMES->GetTagId(stat.path_);
TagId bkt_id = HERMES->GetTagId(HSHM_DEFAULT_MEM_CTX, stat.path_);
if (bkt_id.IsNull()) {
f.status_ = false;
return;
Expand All @@ -94,8 +94,8 @@ class Filesystem : public FilesystemIoClient {
// Update page size
stat.page_size_ = mdm->GetAdapterPageSize(path);
// Bucket parameters
ctx.bkt_params_ = hermes::data_stager::BinaryFileStager::BuildFileParams(
stat.page_size_);
ctx.bkt_params_ =
hermes::BinaryFileStager::BuildFileParams(stat.page_size_);
// Get or create the bucket
if (stat.hflags_.Any(HERMES_FS_TRUNC)) {
// The file was opened with TRUNCATION
Expand All @@ -104,7 +104,7 @@ class Filesystem : public FilesystemIoClient {
stat.bkt_id_.Clear();
} else {
// The file was opened regularly
stat.file_size_ = GetBackendSize(*path_shm);
stat.file_size_ = GetBackendSize(path_shm);
stat.bkt_id_ = HERMES->GetBucket(HSHM_DEFAULT_MEM_CTX, stat.path_, ctx,
stat.file_size_, HERMES_SHOULD_STAGE);
}
Expand Down
Loading

0 comments on commit fedf95b

Please sign in to comment.