Skip to content

Commit

Permalink
add working code on host
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrol aderholdt committed Feb 8, 2024
1 parent 822ad7f commit d9d6213
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/components/tl/ucp/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_status_t status;

ALLTOALL_TASK_CHECK(coll_args->args, tl_team);

#if 1
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"global work buffer not provided nor associated with team");
Expand All @@ -95,6 +95,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
goto out;
}
}
#endif
task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoall_onesided_start;
Expand Down
2 changes: 2 additions & 0 deletions src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask)
(void *)dest, nelems, start, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, start, team), task, out);
#if 1
for (peer = (start + 1) % gsize; peer != start; peer = (peer + 1) % gsize) {
UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + peer * nelems),
(void *)dest, nelems, peer, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task,
out);
}
#endif
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
Expand Down
4 changes: 3 additions & 1 deletion src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements;
size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype);
size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype);
ucc_memory_type_t mtype;
ucc_rank_t peer;
size_t sd_disp, dd_disp, data_size;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

mtype = (TASK_ARGS(task).src.info_v.mem_type == UCC_MEMORY_TYPE_DPU) ? UCC_MEMORY_TYPE_HOST : TASK_ARGS(task).src.info_v.mem_type;
/* perform a put to each member peer using the peer's index in the
* destination displacement. */
for (peer = (grank + 1) % gsize; task->onesided.put_posted < gsize;
Expand All @@ -46,7 +48,7 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)

UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
PTR_OFFSET(dest, dd_disp),
data_size, peer, team, task),
data_size, peer, mtype, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out);
}
Expand Down
5 changes: 5 additions & 0 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[];
#define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg])

#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \
((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg])

/*
#define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \
(PTR_OFFSET((_ctx)->dyn_seg.dyn_buff, (_ctx)->dyn_seg.seg_group_start[_seg] + (_ctx)->dyn_seg.seg_groups[_seg] * _rank))
Expand Down Expand Up @@ -261,6 +264,7 @@ static inline uint64_t UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ucc_tl_ucp_context_t *ctx,
#if 1
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
uint64_t *pkey_size = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 2 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t));
//printf("pkey_size: %lu\n", *pkey_size);
return *pkey_size;//[seg - ctx->dyn_seg.starting_seg[seg]];
#else
return 0;
Expand All @@ -276,6 +280,7 @@ static inline void * UCC_TL_UCP_REMOTE_DYN_KEY(ucc_tl_ucp_context_t *ctx,
#if 1
int seg_group_id = ctx->dyn_seg.seg_groups[seg];
void *pkey = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset);
// printf("pkey %p for peer %d, calced offset: %lu, group_size %lu\n", pkey, rank, (3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset), ctx->dyn_seg.seg_group_size[seg_group_id]);
return pkey;
#else
return 0;
Expand Down
14 changes: 10 additions & 4 deletions src/components/tl/ucp/tl_ucp_sendrecv.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, u
void *offset;
ptrdiff_t base_offset;
int i;
//ucc_rank_t rank = UCC_TL_TEAM_RANK(team);

*segment = -1;
core_rank = ucc_ep_map_eval(UCC_TL_TEAM_MAP(team), peer);
Expand Down Expand Up @@ -283,27 +284,32 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, u
uint64_t check_end = check_base + msglen;
size_t num_keys = 0;
void *packed_key = NULL;
size_t team_size = UCC_TL_TEAM_SIZE(team);
// printf("[%d] base: %lx end: %lx : check_base: %lx check_end: %lx\n", i, base, end, check_base, check_end);
if (check_base >= base &&
check_base < end &&
check_end < end) {
check_end <= end) {
*segment = i;
*rva = UCC_TL_UCP_REMOTE_DYN_RVA(ctx, peer, i);
num_keys = *segment - ctx->dyn_seg.starting_seg[*segment];
for (int j = 0; j < num_keys; j++) {
key_offset += UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ctx, peer, ctx->dyn_seg.starting_seg[*segment] + j);
}
// printf("[%d] offset: %lu, *segment + n_segs: %lu\n", rank, key_offset, *segment + ctx->n_rinfo_segs);
packed_key = UCC_TL_UCP_REMOTE_DYN_KEY(ctx, peer, key_offset, *segment);
/* dynamic segment keys should be placed AFTER
* the ctx's keys (i.e., num_static_segs + segment_number) */
if (ucc_unlikely(NULL == UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs))) {
if (ucc_unlikely(NULL == UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment))) {
// printf("[%d] unpacking %p for peer %d\n", rank, packed_key, peer);
ucs_status_t ucs_status =
ucp_ep_rkey_unpack(*ep, packed_key,
&UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs));
&UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment));
if (UCS_OK != ucs_status) {
return ucs_status_to_ucc_status(ucs_status);
}
}
*rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs);
*rkey = UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment);
// printf("[%d] rkey: %p peer %d ep: %p\n", rank, *rkey, peer, *ep);
*packed_memh = ctx->dynamic_remote_info[i].packed_memh;
return UCC_OK;
}
Expand Down

0 comments on commit d9d6213

Please sign in to comment.