Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add "update a data structure" feature #23

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions configure.ac
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ AC_CONFIG_MACRO_DIR([m4])
LT_INIT

AC_CANONICAL_TARGET
AC_CANONICAL_SYSTEM
AC_CANONICAL_HOST

AM_INIT_AUTOMAKE([foreign subdir-objects -Wall])

Expand Down
7 changes: 6 additions & 1 deletion include/quintain.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ extern "C" {
#define QTN_ERR_UNKNOWN_PROVIDER (-4) /* can't find provider */

/* flags for workload operations */
#define QTN_WORK_USE_SERVER_POOLSET 1
enum {
QTN_WORK_USE_SERVER_POOLSET,
QTN_WORK_CACHE_UPDATE,
QTN_WORK_CACHE_WRITE,
QTN_WORK_CACHE_READ
};

#ifdef __cplusplus
}
Expand Down
6 changes: 5 additions & 1 deletion src/Makefile.subdir
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ src_libquintain_client_la_SOURCES += src/quintain-client.c \
src/quintain-rpc.h

src_libquintain_server_la_SOURCES += src/quintain-server.c \
src/quintain-rpc.h
src/quintain-rpc.h \
src/hoard.cc \
src/hoard.hpp \
src/hoard-c.h

dist_bin_SCRIPTS += src/quintain-benchmark-parse.sh

if HAVE_MPI
bin_PROGRAMS += src/quintain-benchmark
src_quintain_benchmark_LDADD = src/libquintain-client.la -lbedrock-client
noinst_HEADERS += src/quintain-macros.h
endif
15 changes: 15 additions & 0 deletions src/hoard-c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stddef.h>

typedef struct Hoard* hoard_t;

hoard_t hoard_init();
int hoard_put(hoard_t h, int64_t* src, size_t count, size_t offset);
int hoard_get(hoard_t h, int64_t* dest, size_t count, size_t offset);
void hoard_finalize(hoard_t h);
#ifdef __cplusplus
}
#endif
17 changes: 17 additions & 0 deletions src/hoard.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "hoard.hpp"
#include "hoard-c.h"

hoard_t hoard_init()
{
Hoard* h = new (Hoard);
return h;
}
int hoard_put(hoard_t h, int64_t* src, size_t count, size_t offset)
{
return (h->put(src, count, offset));
}
int hoard_get(hoard_t h, int64_t* dest, size_t count, size_t offset)
{
return h->get(dest, count, offset);
}
void hoard_finalize(hoard_t h) { delete h; }
50 changes: 50 additions & 0 deletions src/hoard.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include <vector>
#include <cstdint>
#include <cstddef>
#include <iostream>

/* just a big ol' flat array of data. There is no paging out of excess
* data. no least recently used or anything like that. Just how fast
* can we update this data structure concurrently */

class Hoard {
public:
Hoard() = default;
int put(int64_t* src, size_t count, size_t offset);
int get(int64_t* dest, size_t count, size_t offset);

private:
std::vector<int64_t> m_hoard;
void show()
{
for (const auto& x : m_hoard) std::cout << x << " ";
std::cout << std::endl;
}
};

int Hoard::put(int64_t* src, size_t count, size_t offset)
{
if (m_hoard.size() < offset + count)
m_hoard.resize((m_hoard.size() + offset + count) * 2);

// having trouble using insert() correctly concurrently...
// m_hoard.insert(m_hoard.begin()+offset, src, src+count);
for (size_t i = 0; i < count; i++) m_hoard[offset + i] = src[i];
#ifdef DEBUG_HOARD
std::cout << "Hoard::put: " << src[0] << "... " << count << " items at "
<< offset << std::endl;
;
show();
#endif
return count;
}
int Hoard::get(int64_t* dest, size_t count, size_t offset)
{
#ifdef DEBUG_HOARD
std::cout << "Hoard::get: " << count << " items at " << offset << " "
<< m_hoard[offset] << std::endl;
show();
#endif
for (size_t i = 0; i < count; i++) dest[i] = m_hoard[offset + i];
return count;
}
3 changes: 3 additions & 0 deletions src/quintain-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ int main(int argc, char** argv)
if (json_object_get_boolean(
json_object_object_get(json_cfg, "use_server_poolset")))
work_flags |= QTN_WORK_USE_SERVER_POOLSET;
if (json_object_get_boolean(
json_object_object_get(json_cfg, "concurrency_test")))
work_flags |= QTN_WORK_CACHE_UPDATE;
bulk_size
= json_object_get_int(json_object_object_get(json_cfg, "bulk_size"));
if (strcmp("pull", json_object_get_string(
Expand Down
6 changes: 5 additions & 1 deletion src/quintain-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ typedef struct {
uint32_t flags; /* flags to modify behavior */
uint32_t bulk_op; /* what type of bulk xfer to do */
hg_bulk_t bulk_handle; /* bulk handle (if set) for bulk xfer */
char* req_buffer; /* dummy buffer */
int64_t scratch; /* "cache update" mode: value to store in cache (TODO: use
req_buffer instead) */
int64_t count; /* "cache update" mode: how many items to update */
int64_t offset; /* "cache update" mode: where to update */
char* req_buffer; /* dummy buffer */
} qtn_work_in_t;
static inline hg_return_t hg_proc_qtn_work_in_t(hg_proc_t proc, void* v_out_p);

Expand Down
18 changes: 18 additions & 0 deletions src/quintain-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "quintain-rpc.h"
#include "quintain-macros.h"

#include "hoard-c.h"

DECLARE_MARGO_RPC_HANDLER(qtn_get_server_config_ult)
DECLARE_MARGO_RPC_HANDLER(qtn_work_ult)

static int validate_and_complete_config(struct json_object* _config,
Expand All @@ -35,6 +38,8 @@ struct quintain_provider {
hg_id_t qtn_work_rpc_id;

struct json_object* json_cfg;

hoard_t hoard_id;
};

static void quintain_server_finalize_cb(void* data)
Expand Down Expand Up @@ -130,6 +135,9 @@ int quintain_provider_register(margo_instance_id mid,
goto error;
}

/* initialize our data strucutre in case we measure concurrency */
tmp_provider->hoard_id = hoard_init();

/* register RPCs */
rpc_id = MARGO_REGISTER_PROVIDER(mid, "qtn_work_rpc", qtn_work_in_t,
qtn_work_out_t, qtn_work_ult, provider_id,
Expand Down Expand Up @@ -159,6 +167,7 @@ int quintain_provider_register(margo_instance_id mid,

int quintain_provider_deregister(quintain_provider_t provider)
{
hoard_finalize(provider->hoard_id);
margo_provider_pop_finalize_callback(provider->mid, provider);
quintain_server_finalize_cb(provider);
return QTN_SUCCESS;
Expand Down Expand Up @@ -235,6 +244,15 @@ static void qtn_work_ult(hg_handle_t handle)
= margo_bulk_transfer(mid, in.bulk_op, info->addr, in.bulk_handle,
0, bulk_handle, 0, in.bulk_size);
}

if (in.flags & QTN_WORK_CACHE_UPDATE) {
if (in.flags & QTN_WORK_CACHE_WRITE)
hoard_put(provider->hoard_id, &(in.scratch),
in.count / sizeof(int64_t), in.offset);
else
hoard_get(provider->hoard_id, &(in.scratch),
in.count / sizeof(int64_t), in.offset);
}
if (out.ret != HG_SUCCESS) {
QTN_ERROR(mid, "margo_bulk_transfer: %s", HG_Error_to_string(out.ret));
}
Expand Down