diff --git a/tasks/coeus_mdm/CMakeLists.txt b/tasks/coeus_mdm/CMakeLists.txt new file mode 100644 index 00000000..d4861178 --- /dev/null +++ b/tasks/coeus_mdm/CMakeLists.txt @@ -0,0 +1,10 @@ +#------------------------------------------------------------------------------ +# Build Hrun Admin Task Library +#------------------------------------------------------------------------------ +include_directories(include) +add_subdirectory(src) + +#----------------------------------------------------------------------------- +# Install HRUN Admin Task Library Headers +#----------------------------------------------------------------------------- +install(DIRECTORY include DESTINATION ${CMAKE_INSTALL_PREFIX}) diff --git a/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm.h b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm.h new file mode 100644 index 00000000..68143537 --- /dev/null +++ b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm.h @@ -0,0 +1,83 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HRUN_coeus_mdm_H_ +#define HRUN_coeus_mdm_H_ + +#include "coeus_mdm_tasks.h" + +namespace hrun::coeus_mdm { + +/** Create coeus_mdm requests */ +class Client : public TaskLibClient { + + public: + /** Default constructor */ + Client() = default; + + /** Destructor */ + ~Client() = default; + + /** Async create a task state */ + HSHM_ALWAYS_INLINE + LPointer AsyncCreate(const TaskNode &task_node, + const DomainId &domain_id, + const std::string &state_name, + const std::string &db_path) { + id_ = TaskStateId::GetNull(); + QueueManagerInfo &qm = HRUN_CLIENT->server_config_.queue_manager_; + std::vector queue_info = { + {1, 1, qm.queue_depth_, 0}, + {1, 1, qm.queue_depth_, QUEUE_LONG_RUNNING}, + {qm.max_lanes_, qm.max_lanes_, qm.queue_depth_, QUEUE_LOW_LATENCY} + }; + return HRUN_ADMIN->AsyncCreateTaskState( + task_node, domain_id, state_name, id_, queue_info, db_path); + } + HRUN_TASK_NODE_ROOT(AsyncCreate) + template + HSHM_ALWAYS_INLINE + void CreateRoot(Args&& ...args) { + LPointer task = + AsyncCreateRoot(std::forward(args)...); + task->Wait(); + id_ = task->id_; + queue_id_ = QueueId(id_); + HRUN_CLIENT->DelTask(task); + } + + /** Destroy task state + queue */ + HSHM_ALWAYS_INLINE + void DestroyRoot(const DomainId &domain_id) { + HRUN_ADMIN->DestroyTaskStateRoot(domain_id, id_); + } + + /** Call a custom method */ + HSHM_ALWAYS_INLINE + void AsyncMdm_insertConstruct(Mdm_insertTask *task, + const TaskNode &task_node, + const DomainId &domain_id, + DbOperation db_op) { + HRUN_CLIENT->ConstructTask( + task, task_node, domain_id, id_, db_op); + } + HSHM_ALWAYS_INLINE + void Mdm_insertRoot(const DomainId &domain_id, DbOperation db_op) { + LPointer> task = AsyncMdm_insertRoot(domain_id, db_op); +// task.ptr_->Wait(); + } + HRUN_TASK_NODE_PUSH_ROOT(Mdm_insert); +}; + +} // namespace hrun + +#endif // HRUN_coeus_mdm_H_ diff --git a/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_lib_exec.h b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_lib_exec.h new file mode 100644 index 00000000..ea7a4412 --- /dev/null +++ b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_lib_exec.h @@ -0,0 +1,197 @@ +#ifndef HRUN_COEUS_MDM_LIB_EXEC_H_ +#define HRUN_COEUS_MDM_LIB_EXEC_H_ + +/** Execute a task */ +void Run(u32 method, Task *task, RunContext &rctx) override { + switch (method) { + case Method::kConstruct: { + Construct(reinterpret_cast(task), rctx); + break; + } + case Method::kDestruct: { + Destruct(reinterpret_cast(task), rctx); + break; + } + case Method::kMdm_insert: { + Mdm_insert(reinterpret_cast(task), rctx); + break; + } + } +} +/** Delete a task */ +void Del(u32 method, Task *task) override { + switch (method) { + case Method::kConstruct: { + HRUN_CLIENT->DelTask(reinterpret_cast(task)); + break; + } + case Method::kDestruct: { + HRUN_CLIENT->DelTask(reinterpret_cast(task)); + break; + } + case Method::kMdm_insert: { + HRUN_CLIENT->DelTask(reinterpret_cast(task)); + break; + } + } +} +/** Duplicate a task */ +void Dup(u32 method, Task *orig_task, std::vector> &dups) override { + switch (method) { + case Method::kConstruct: { + hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); + break; + } + case Method::kDestruct: { + hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); + break; + } + case Method::kMdm_insert: { + hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); + break; + } + } +} +/** Register the duplicate output with the origin task */ +void DupEnd(u32 method, u32 replica, Task *orig_task, Task *dup_task) override { + switch (method) { + case Method::kConstruct: { + hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); + break; + } + case Method::kDestruct: { + hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); + break; + } + case Method::kMdm_insert: { + hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); + break; + } + } +} +/** Ensure there is space to store replicated outputs */ +void ReplicateStart(u32 method, u32 count, Task *task) override { + switch (method) { + case Method::kConstruct: { + hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); + break; + } + case Method::kDestruct: { + hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); + break; + } + case Method::kMdm_insert: { + hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); + break; + } + } +} +/** Determine success and handle failures */ +void ReplicateEnd(u32 method, Task *task) override { + switch (method) { + case Method::kConstruct: { + hrun::CALL_REPLICA_END(reinterpret_cast(task)); + break; + } + case Method::kDestruct: { + hrun::CALL_REPLICA_END(reinterpret_cast(task)); + break; + } + case Method::kMdm_insert: { + hrun::CALL_REPLICA_END(reinterpret_cast(task)); + break; + } + } +} +/** Serialize a task when initially pushing into remote */ +std::vector SaveStart(u32 method, BinaryOutputArchive &ar, Task *task) override { + switch (method) { + case Method::kConstruct: { + ar << *reinterpret_cast(task); + break; + } + case Method::kDestruct: { + ar << *reinterpret_cast(task); + break; + } + case Method::kMdm_insert: { + ar << *reinterpret_cast(task); + break; + } + } + return ar.Get(); +} +/** Deserialize a task when popping from remote queue */ +TaskPointer LoadStart(u32 method, BinaryInputArchive &ar) override { + TaskPointer task_ptr; + switch (method) { + case Method::kConstruct: { + task_ptr.ptr_ = HRUN_CLIENT->NewEmptyTask(task_ptr.shm_); + ar >> *reinterpret_cast(task_ptr.ptr_); + break; + } + case Method::kDestruct: { + task_ptr.ptr_ = HRUN_CLIENT->NewEmptyTask(task_ptr.shm_); + ar >> *reinterpret_cast(task_ptr.ptr_); + break; + } + case Method::kMdm_insert: { + task_ptr.ptr_ = HRUN_CLIENT->NewEmptyTask(task_ptr.shm_); + ar >> *reinterpret_cast(task_ptr.ptr_); + break; + } + } + return task_ptr; +} +/** Serialize a task when returning from remote queue */ +std::vector SaveEnd(u32 method, BinaryOutputArchive &ar, Task *task) override { + switch (method) { + case Method::kConstruct: { + ar << *reinterpret_cast(task); + break; + } + case Method::kDestruct: { + ar << *reinterpret_cast(task); + break; + } + case Method::kMdm_insert: { + ar << *reinterpret_cast(task); + break; + } + } + return ar.Get(); +} +/** Deserialize a task when returning from remote queue */ +void LoadEnd(u32 replica, u32 method, BinaryInputArchive &ar, Task *task) override { + switch (method) { + case Method::kConstruct: { + ar.Deserialize(replica, *reinterpret_cast(task)); + break; + } + case Method::kDestruct: { + ar.Deserialize(replica, *reinterpret_cast(task)); + break; + } + case Method::kMdm_insert: { + ar.Deserialize(replica, *reinterpret_cast(task)); + break; + } + } +} +/** Get the grouping of the task */ +u32 GetGroup(u32 method, Task *task, hshm::charbuf &group) override { + switch (method) { + case Method::kConstruct: { + return reinterpret_cast(task)->GetGroup(group); + } + case Method::kDestruct: { + return reinterpret_cast(task)->GetGroup(group); + } + case Method::kMdm_insert: { + return reinterpret_cast(task)->GetGroup(group); + } + } + return -1; +} + +#endif // HRUN_COEUS_MDM_METHODS_H_ \ No newline at end of file diff --git a/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.h b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.h new file mode 100644 index 00000000..07ee7f6e --- /dev/null +++ b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.h @@ -0,0 +1,9 @@ +#ifndef HRUN_COEUS_MDM_METHODS_H_ +#define HRUN_COEUS_MDM_METHODS_H_ + +/** The set of methods in the admin task */ +struct Method : public TaskMethod { + TASK_METHOD_T kMdm_insert = kLast + 0; +}; + +#endif // HRUN_COEUS_MDM_METHODS_H_ \ No newline at end of file diff --git a/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.yaml b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.yaml new file mode 100644 index 00000000..afe1c1a2 --- /dev/null +++ b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_methods.yaml @@ -0,0 +1 @@ +kMdm_insert: 0 \ No newline at end of file diff --git a/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_tasks.h b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_tasks.h new file mode 100644 index 00000000..29795d3e --- /dev/null +++ b/tasks/coeus_mdm/include/coeus_mdm/coeus_mdm_tasks.h @@ -0,0 +1,134 @@ +// +// Created by lukemartinlogan on 8/11/23. +// + +#ifndef HRUN_TASKS_TASK_TEMPL_INCLUDE_coeus_mdm_coeus_mdm_TASKS_H_ +#define HRUN_TASKS_TASK_TEMPL_INCLUDE_coeus_mdm_coeus_mdm_TASKS_H_ + +#include "hrun/api/hrun_client.h" +#include "hrun/task_registry/task_lib.h" +#include "hrun_admin/hrun_admin.h" +#include "hrun/queue_manager/queue_manager_client.h" +#include "proc_queue/proc_queue.h" + +#include "common/DbOperation.h" + +namespace hrun::coeus_mdm { + +#include "coeus_mdm_methods.h" +#include "hrun/hrun_namespace.h" + +using hrun::proc_queue::TypedPushTask; +using hrun::proc_queue::PushTask; + +/** + * A task to create coeus_mdm + * */ +using hrun::Admin::CreateTaskStateTask; +struct ConstructTask : public CreateTaskStateTask { + IN hipc::ShmArchive db_path_; + /** SHM default constructor */ + HSHM_ALWAYS_INLINE explicit + ConstructTask(hipc::Allocator *alloc) + : CreateTaskStateTask(alloc) {} + + /** Emplace constructor */ + HSHM_ALWAYS_INLINE explicit + ConstructTask(hipc::Allocator *alloc, + const TaskNode &task_node, + const DomainId &domain_id, + const std::string &state_name, + const TaskStateId &id, + const std::vector &queue_info, + const std::string &db_path) + : CreateTaskStateTask(alloc, task_node, domain_id, state_name, + "coeus_mdm", id, queue_info) { + // Custom params + HSHM_MAKE_AR(db_path_, alloc, db_path); + } + + HSHM_ALWAYS_INLINE + ~ConstructTask() { + // Custom params + } +}; + +/** A task to destroy coeus_mdm */ +using hrun::Admin::DestroyTaskStateTask; +struct DestructTask : public DestroyTaskStateTask { + /** SHM default constructor */ + HSHM_ALWAYS_INLINE explicit + DestructTask(hipc::Allocator *alloc) + : DestroyTaskStateTask(alloc) {} + + /** Emplace constructor */ + HSHM_ALWAYS_INLINE explicit + DestructTask(hipc::Allocator *alloc, + const TaskNode &task_node, + const DomainId &domain_id, + TaskStateId &state_id) + : DestroyTaskStateTask(alloc, task_node, domain_id, state_id) {} + + /** Create group */ + HSHM_ALWAYS_INLINE + u32 GetGroup(hshm::charbuf &group) { + return TASK_UNORDERED; + } +}; + +/** + * A custom task in coeus_mdm + * */ +struct Mdm_insertTask : public Task, TaskFlags { + IN hipc::ShmArchive db_op_; + + /** SHM default constructor */ + HSHM_ALWAYS_INLINE explicit + Mdm_insertTask(hipc::Allocator *alloc) : Task(alloc) {} + + /** Emplace constructor */ + HSHM_ALWAYS_INLINE explicit + Mdm_insertTask(hipc::Allocator *alloc, + const TaskNode &task_node, + const DomainId &domain_id, + const TaskStateId &state_id, + const DbOperation &db_op) : Task(alloc) { + // Initialize task + task_node_ = task_node; + lane_hash_ = 0; + prio_ = TaskPrio::kLowLatency; + task_state_ = state_id; + method_ = Method::kMdm_insert; + task_flags_.SetBits(TASK_FIRE_AND_FORGET); + domain_id_ = domain_id; + + // Store DbOperation + std::stringstream ss; + cereal::BinaryOutputArchive ar(ss); + ar << db_op; + std::string db_op_ser = ss.str(); + HSHM_MAKE_AR(db_op_, alloc, db_op_ser); + } + + ~Mdm_insertTask(){ + HSHM_DESTROY_AR(db_op_); + } + + DbOperation GetDbOp(){ + DbOperation db_op; + std::stringstream ss(db_op_->str()); + cereal::BinaryInputArchive ar(ss); + ar >> db_op; + return db_op; + } + + /** Create group */ + HSHM_ALWAYS_INLINE + u32 GetGroup(hshm::charbuf &group) { + return TASK_UNORDERED; + } +}; + +} // namespace hrun::coeus_mdm + +#endif // HRUN_TASKS_TASK_TEMPL_INCLUDE_coeus_mdm_coeus_mdm_TASKS_H_ diff --git a/tasks/coeus_mdm/src/CMakeLists.txt b/tasks/coeus_mdm/src/CMakeLists.txt new file mode 100644 index 00000000..3e8d8970 --- /dev/null +++ b/tasks/coeus_mdm/src/CMakeLists.txt @@ -0,0 +1,54 @@ +#------------------------------------------------------------------------------ +# Build Small Message Task Library +#------------------------------------------------------------------------------ +add_library(coeus_mdm SHARED + coeus_mdm.cc) +target_link_libraries(coeus_mdm ${Hermes_RUNTIME_LIBRARIES}) +target_include_directories(coeus_mdm PRIVATE ${PROJECT_SOURCE_DIR}/include) + +#------------------------------------------------------------------------------ +# Install Small Message Task Library +#------------------------------------------------------------------------------ +#install( +# TARGETS +# coeus_mdm +# EXPORT +# ${HERMES_EXPORTED_TARGETS} +# LIBRARY DESTINATION ${HERMES_INSTALL_LIB_DIR} +# ARCHIVE DESTINATION ${HERMES_INSTALL_LIB_DIR} +# RUNTIME DESTINATION ${HERMES_INSTALL_BIN_DIR} +#) + +#----------------------------------------------------------------------------- +# Add Target(s) to CMake Install for import into other projects +#----------------------------------------------------------------------------- +#install( +# EXPORT +# ${HERMES_EXPORTED_TARGETS} +# DESTINATION +# ${HERMES_INSTALL_DATA_DIR}/cmake/hermes +# FILE +# ${HERMES_EXPORTED_TARGETS}.cmake +#) + +#----------------------------------------------------------------------------- +# Export all exported targets to the build tree for use by parent project +#----------------------------------------------------------------------------- +set(HERMES_EXPORTED_LIBS + coeus_mdm + ${HERMES_EXPORTED_LIBS}) +if(NOT HERMES_EXTERNALLY_CONFIGURED) + EXPORT ( + TARGETS + ${HERMES_EXPORTED_LIBS} + FILE + ${HERMES_EXPORTED_TARGETS}.cmake + ) +endif() + +#------------------------------------------------------------------------------ +# Coverage +#------------------------------------------------------------------------------ +if(HERMES_ENABLE_COVERAGE) + set_coverage_flags(coeus_mdm) +endif() diff --git a/tasks/coeus_mdm/src/coeus_mdm.cc b/tasks/coeus_mdm/src/coeus_mdm.cc new file mode 100644 index 00000000..38c18d8b --- /dev/null +++ b/tasks/coeus_mdm/src/coeus_mdm.cc @@ -0,0 +1,56 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include "hrun_admin/hrun_admin.h" +#include "hrun/api/hrun_runtime.h" +#include "coeus_mdm/coeus_mdm.h" + +#include "common/SQlite.h" + +namespace hrun::coeus_mdm { + +class Server : public TaskLib { +private: + std::unique_ptr db; + public: + Server() = default; + + void Construct(ConstructTask *task, RunContext &rctx) { + db = std::make_unique(task->db_path_->str()); + + task->SetModuleComplete(); + } + + void Destruct(DestructTask *task, RunContext &rctx) { + task->SetModuleComplete(); + } + + void Mdm_insert(Mdm_insertTask *task, RunContext &rctx) { + DbOperation db_op = task->GetDbOp(); + + if (db_op.type == OperationType::InsertData) { + db->InsertVariableMetadata(db_op.step, db_op.rank, db_op.metadata); + db->InsertBlobLocation(db_op.step, db_op.rank, db_op.name, db_op.blobInfo); + } else if (db_op.type == OperationType::UpdateSteps) { + db->UpdateTotalSteps(db_op.uid, db_op.currentStep); + } + + task->SetModuleComplete(); + } + + public: +#include "coeus_mdm/coeus_mdm_lib_exec.h" +}; + +} // namespace hrun::coeus_mdm + +HRUN_TASK_CC(hrun::coeus_mdm::Server, "coeus_mdm");