Skip to content

Commit

Permalink
CL/DOCA_UROM: Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
nsarka committed Jun 18, 2024
1 parent 20f37d9 commit 4d79eae
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 1,149 deletions.
37 changes: 12 additions & 25 deletions contrib/doca_urom_ucc_plugin/dpu/worker_ucc_p2p.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "worker_ucc.h"
#include "../common/urom_ucc.h"

DOCA_LOG_REGISTER(UROM::WORKER::UCC::P2P);
DOCA_LOG_REGISTER(UCC::DOCA_CL : WORKER_UCC_P2P);

void urom_ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status)
{
Expand Down Expand Up @@ -439,24 +439,18 @@ doca_error_t ucc_rma_get_host(void *buffer,
ucs_status_t ucs_status;
ucs_status_ptr_t ucp_status;
uint64_t rva = (uint64_t)target;
//static ucp_rkey_h prev_rkey = NULL;
//static void *prev_packed_key = NULL;
ucp_request_param_t req_param = {0};

if (packed_key == NULL)
return DOCA_ERROR_INVALID_VALUE;

ep = ucc_worker->ucc_data[ctx_id].host;
//if (prev_packed_key != packed_key) {
ucs_status = ucp_ep_rkey_unpack(ep, packed_key, &rkey);
if (ucs_status != UCS_OK) {
DOCA_LOG_ERR("Failed to unpack rkey");
return DOCA_ERROR_NOT_FOUND;
}
//prev_rkey = rkey;
//prev_packed_key = packed_key;
//} else
//rkey = prev_rkey;

ucs_status = ucp_ep_rkey_unpack(ep, packed_key, &rkey);
if (ucs_status != UCS_OK) {
DOCA_LOG_ERR("Failed to unpack rkey");
return DOCA_ERROR_NOT_FOUND;
}

ucp_status = ucp_get_nbx(ep, buffer, msglen, rva, rkey, &req_param);
if (UCS_OK != ucp_status) {
Expand Down Expand Up @@ -494,24 +488,17 @@ doca_error_t ucc_rma_put_host(void *buffer,
ucs_status_ptr_t ucp_status;
uint64_t rva = (uint64_t)target;
ucp_request_param_t req_param = {0};
//static void *prev_packed_dst_key = NULL;
//static ucp_rkey_h prev_dst_rkey = NULL;

if (packed_key == NULL)
return DOCA_ERROR_INVALID_VALUE;

ep = ucc_worker->ucc_data[ctx_id].host;

//if (prev_packed_dst_key != packed_key) {
ucs_status = ucp_ep_rkey_unpack(ep, packed_key, &rkey);
if (ucs_status != UCS_OK) {
DOCA_LOG_ERR("Failed to unpack rkey");
return DOCA_ERROR_NOT_FOUND;
}
//prev_dst_rkey = rkey;
//prev_packed_dst_key = packed_key;
//} else
//rkey = prev_dst_rkey;
ucs_status = ucp_ep_rkey_unpack(ep, packed_key, &rkey);
if (ucs_status != UCS_OK) {
DOCA_LOG_ERR("Failed to unpack rkey");
return DOCA_ERROR_NOT_FOUND;
}

ucp_status = ucp_put_nbx(ep, buffer, msglen, rva, rkey, &req_param);
if (UCS_OK != ucp_status) {
Expand Down
6 changes: 2 additions & 4 deletions src/components/cl/doca_urom/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ sources = \
cl_doca_urom_lib.c \
cl_doca_urom_context.c \
cl_doca_urom_team.c \
cl_doca_urom_common_doca.c \
cl_doca_urom_common_doca.h \
cl_doca_urom_common_doca_urom.c \
cl_doca_urom_common_doca_urom.h \
cl_doca_urom_common.c \
cl_doca_urom_common.h \
cl_doca_urom_worker_ucc.c \
cl_doca_urom_worker_ucc.h \
cl_doca_urom_coll.c
Expand Down
15 changes: 7 additions & 8 deletions src/components/cl/doca_urom/cl_doca_urom.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
#include <doca_ctx.h>
#include <doca_buf.h>

#include "cl_doca_urom_common_doca.h"
#include "cl_doca_urom_common_doca_urom.h"
#include "cl_doca_urom_common.h"
#include "cl_doca_urom_worker_ucc.h"

#include <urom_ucc.h>
Expand Down Expand Up @@ -85,12 +84,12 @@ UCC_CLASS_DECLARE(ucc_cl_doca_urom_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_cl_doca_urom_team {
ucc_cl_team_t super;
ucc_team_h **teams;
unsigned n_teams;
ucc_coll_score_t *score;
ucc_score_map_t *score_map;
struct ucc_result res; // used for the cookie
ucc_cl_team_t super;
ucc_team_h **teams;
unsigned n_teams;
ucc_coll_score_t *score;
ucc_score_map_t *score_map;
struct ucc_cl_doca_urom_result res; // used for the cookie
} ucc_cl_doca_urom_team_t;
UCC_CLASS_DECLARE(ucc_cl_doca_urom_team_t, ucc_base_context_t *,
const ucc_base_team_params_t *);
Expand Down
74 changes: 44 additions & 30 deletions src/components/cl/doca_urom/cl_doca_urom_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ static ucc_status_t ucc_cl_doca_urom_triggered_post_setup(ucc_coll_task_t *task)

static ucc_status_t ucc_cl_doca_urom_coll_full_start(ucc_coll_task_t *task)
{
doca_error_t result;
ucc_worker_key_buf keys;
ucc_cl_doca_urom_team_t *cl_team = ucc_derived_of(task->team,
ucc_cl_doca_urom_team_t);
ucc_cl_doca_urom_context_t *ctx = UCC_CL_DOCA_UROM_TEAM_CTX(cl_team);
Expand All @@ -38,6 +36,8 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_start(ucc_coll_task_t *task)
ucc_cl_doca_urom_schedule_t);
struct export_buf *src_ebuf = &schedule->src_ebuf;
struct export_buf *dst_ebuf = &schedule->dst_ebuf;
doca_error_t result;
ucc_worker_key_buf keys;

src_ebuf->memh = NULL;
dst_ebuf->memh = NULL;
Expand All @@ -49,56 +49,71 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_start(ucc_coll_task_t *task)
in_place = 1;
}

if (!in_place) {
ucc_cl_doca_urom_buffer_export_ucc(
tl_ctx->worker.ucp_context,
coll_args->src.info.buffer,
coll_args->src.info.count *
ucc_dt_size(coll_args->src.info.datatype),
src_ebuf);
}

ucc_cl_doca_urom_buffer_export_ucc(
tl_ctx->worker.ucp_context,
coll_args->dst.info.buffer,
coll_args->dst.info.count *
ucc_dt_size(coll_args->dst.info.datatype),
dst_ebuf);

switch (coll_args->coll_type) {
case UCC_COLL_TYPE_ALLTOALL:
{
if (!in_place) {
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->src.info.buffer, coll_args->src.info.count * ucc_cl_doca_urom_dt_size(coll_args->src.info.datatype), src_ebuf);
keys.src_len = src_ebuf->packed_key_len;
memcpy(keys.rkeys, src_ebuf->packed_key, keys.src_len);
} else {
keys.src_len = 0;
}
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->dst.info.buffer, coll_args->dst.info.count * ucc_cl_doca_urom_dt_size(coll_args->dst.info.datatype), dst_ebuf);
keys.dst_len = dst_ebuf->packed_key_len;
memcpy(keys.rkeys + keys.src_len, dst_ebuf->packed_key, keys.dst_len);
memcpy(keys.rkeys + keys.src_len,
dst_ebuf->packed_key, keys.dst_len);
use_xgvmi = 0;
} break;
case UCC_COLL_TYPE_ALLREDUCE:
{
if (!in_place) {
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->src.info.buffer, coll_args->src.info.count * ucc_cl_doca_urom_dt_size(coll_args->src.info.datatype), src_ebuf);
keys.src_len = src_ebuf->packed_memh_len;
memcpy(keys.rkeys, src_ebuf->packed_memh, keys.src_len);
} else {
keys.src_len = 0;
}
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->dst.info.buffer, coll_args->dst.info.count * ucc_cl_doca_urom_dt_size(coll_args->dst.info.datatype), dst_ebuf);
keys.dst_len = dst_ebuf->packed_memh_len;
memcpy(keys.rkeys + keys.src_len, dst_ebuf->packed_memh, keys.dst_len);
memcpy(keys.rkeys + keys.src_len,
dst_ebuf->packed_memh, keys.dst_len);
use_xgvmi = 1;
} break;
case UCC_COLL_TYPE_ALLGATHER:
{
if (!in_place) {
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->src.info.buffer, coll_args->src.info.count * ucc_cl_doca_urom_dt_size(coll_args->src.info.datatype), src_ebuf);
keys.src_len = src_ebuf->packed_memh_len;
memcpy(keys.rkeys, src_ebuf->packed_memh, keys.src_len);
} else {
keys.src_len = 0;
}
ucc_cl_doca_urom_buffer_export_ucc(tl_ctx->worker.ucp_context, coll_args->dst.info.buffer, coll_args->dst.info.count * ucc_cl_doca_urom_dt_size(coll_args->dst.info.datatype), dst_ebuf);
keys.dst_len = dst_ebuf->packed_memh_len;
memcpy(keys.rkeys + keys.src_len, dst_ebuf->packed_memh, keys.dst_len);
memcpy(keys.rkeys + keys.src_len,
dst_ebuf->packed_memh,
keys.dst_len);
use_xgvmi = 1;
} break;
default:
cl_error(&cl_lib->super, "coll_type %s is not supported", ucc_coll_type_str(coll_args->coll_type));
cl_error(&cl_lib->super, "coll_type %s is not supported",
ucc_coll_type_str(coll_args->coll_type));
}

coll_args->mask |= UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER;

result = doca_urom_ucc_task_collective(cl_lib->urom_ctx.urom_worker,
result = ucc_cl_doca_urom_task_collective(cl_lib->urom_ctx.urom_worker,
cookie,
rank,
coll_args,
Expand All @@ -107,7 +122,7 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_start(ucc_coll_task_t *task)
&keys,
sizeof(ucc_worker_key_buf),
0,
urom_ucc_collective_finished);
ucc_cl_doca_urom_collective_finished);
if (result != DOCA_SUCCESS) {
cl_error(&cl_lib->super, "Failed to create UCC collective task");
}
Expand All @@ -120,7 +135,6 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_start(ucc_coll_task_t *task)

static ucc_status_t ucc_cl_doca_urom_coll_full_finalize(ucc_coll_task_t *task)
{
ucc_status_t status;
ucc_cl_doca_urom_schedule_t *schedule = ucc_derived_of(task,
ucc_cl_doca_urom_schedule_t);
ucc_cl_doca_urom_team_t *cl_team = ucc_derived_of(task->team,
Expand All @@ -134,6 +148,7 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_finalize(ucc_coll_task_t *task)
ucc_tl_ucp_context_t);
struct export_buf *src_ebuf = &schedule->src_ebuf;
struct export_buf *dst_ebuf = &schedule->dst_ebuf;
ucc_status_t status;

if (src_ebuf->memh) {
ucp_mem_unmap(tl_ctx->worker.ucp_context, src_ebuf->memh);
Expand All @@ -148,20 +163,20 @@ static ucc_status_t ucc_cl_doca_urom_coll_full_finalize(ucc_coll_task_t *task)

static void ucc_cl_doca_urom_coll_full_progress(ucc_coll_task_t *ctask)
{
int ret;
ucc_cl_doca_urom_team_t *cl_team = ucc_derived_of(ctask->team,
ucc_cl_doca_urom_team_t *cl_team = ucc_derived_of(ctask->team,
ucc_cl_doca_urom_team_t);
ucc_cl_doca_urom_context_t *ctx = UCC_CL_DOCA_UROM_TEAM_CTX(cl_team);
ucc_cl_doca_urom_lib_t *cl_lib = ucc_derived_of(
ucc_cl_doca_urom_context_t *ctx = UCC_CL_DOCA_UROM_TEAM_CTX(cl_team);
ucc_cl_doca_urom_lib_t *cl_lib = ucc_derived_of(
ctx->super.super.lib,
ucc_cl_doca_urom_lib_t);
ucc_cl_doca_urom_schedule_t *schedule = ucc_derived_of(ctask,
ucc_cl_doca_urom_schedule_t *schedule = ucc_derived_of(ctask,
ucc_cl_doca_urom_schedule_t);
int ucp_index = cl_lib->tl_ucp_index;
ucc_tl_ucp_context_t *tl_ctx = ucc_derived_of(
ctx->super.tl_ctxs[ucp_index],
ucc_tl_ucp_context_t);
struct ucc_result *res = &schedule->res;
int ucp_index = cl_lib->tl_ucp_index;
ucc_tl_ucp_context_t *tl_ctx = ucc_derived_of(
ctx->super.tl_ctxs[ucp_index],
ucc_tl_ucp_context_t);
struct ucc_cl_doca_urom_result *res = &schedule->res;
int ret;

if (res == NULL) {
cl_error(cl_lib, "Error in UROM");
Expand Down Expand Up @@ -189,15 +204,15 @@ ucc_status_t ucc_cl_doca_urom_coll_full_init(
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task)
{
ucc_status_t status;
ucc_cl_doca_urom_schedule_t *cl_schedule;
ucc_base_coll_args_t args;
ucc_schedule_t *schedule;
ucc_cl_doca_urom_team_t *cl_team = ucc_derived_of(team,
ucc_cl_doca_urom_team_t);
ucc_cl_doca_urom_context_t *ctx = UCC_CL_DOCA_UROM_TEAM_CTX(cl_team);
ucc_cl_doca_urom_lib_t *cl_lib = ucc_derived_of(ctx->super.super.lib,
ucc_cl_doca_urom_lib_t);
ucc_status_t status;
ucc_cl_doca_urom_schedule_t *cl_schedule;
ucc_base_coll_args_t args;
ucc_schedule_t *schedule;

cl_schedule = ucc_cl_doca_urom_get_schedule(cl_team);
if (ucc_unlikely(!cl_schedule)) {
Expand Down Expand Up @@ -245,4 +260,3 @@ ucc_status_t ucc_cl_doca_urom_coll_init(ucc_base_coll_args_t *coll_args,

return UCC_OK;
}

9 changes: 4 additions & 5 deletions src/components/cl/doca_urom/cl_doca_urom_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ extern const char
*ucc_cl_doca_urom_default_alg_select_str[UCC_CL_DOCA_UROM_N_DEFAULT_ALG_SELECT_STR];

typedef struct ucc_cl_doca_urom_schedule_t {
ucc_schedule_pipelined_t super;
struct ucc_result res;
struct export_buf src_ebuf;
struct export_buf dst_ebuf;
ucc_schedule_pipelined_t super;
struct ucc_cl_doca_urom_result res;
struct export_buf src_ebuf;
struct export_buf dst_ebuf;
} ucc_cl_doca_urom_schedule_t;

static inline ucc_cl_doca_urom_schedule_t *
Expand All @@ -39,4 +39,3 @@ static inline void ucc_cl_doca_urom_put_schedule(ucc_schedule_t *schedule)
}

#endif

Loading

0 comments on commit 4d79eae

Please sign in to comment.