From d9d6213833cf76de2697cc59e8351844d4a913fa Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Thu, 8 Feb 2024 08:28:50 -0800 Subject: [PATCH] add working code on host --- src/components/tl/ucp/alltoall/alltoall.c | 3 ++- src/components/tl/ucp/alltoall/alltoall_onesided.c | 2 ++ .../tl/ucp/alltoallv/alltoallv_onesided.c | 4 +++- src/components/tl/ucp/tl_ucp.h | 5 +++++ src/components/tl/ucp/tl_ucp_sendrecv.h | 14 ++++++++++---- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/components/tl/ucp/alltoall/alltoall.c b/src/components/tl/ucp/alltoall/alltoall.c index 3803d96426..73a744ffe5 100644 --- a/src/components/tl/ucp/alltoall/alltoall.c +++ b/src/components/tl/ucp/alltoall/alltoall.c @@ -80,7 +80,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args, ucc_status_t status; ALLTOALL_TASK_CHECK(coll_args->args, tl_team); - +#if 1 if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) { tl_error(UCC_TL_TEAM_LIB(tl_team), "global work buffer not provided nor associated with team"); @@ -95,6 +95,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args, goto out; } } +#endif task = ucc_tl_ucp_init_task(coll_args, team); *task_h = &task->super; task->super.post = ucc_tl_ucp_alltoall_onesided_start; diff --git a/src/components/tl/ucp/alltoall/alltoall_onesided.c b/src/components/tl/ucp/alltoall/alltoall_onesided.c index 3557596f49..5f1c2e8f60 100644 --- a/src/components/tl/ucp/alltoall/alltoall_onesided.c +++ b/src/components/tl/ucp/alltoall/alltoall_onesided.c @@ -40,6 +40,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask) (void *)dest, nelems, start, mtype, team, task), task, out); UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, start, team), task, out); +#if 1 for (peer = (start + 1) % gsize; peer != start; peer = (peer + 1) % gsize) { UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + peer * nelems), (void *)dest, nelems, peer, mtype, team, task), @@ -47,6 +48,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask) UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out); } +#endif return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); out: return task->super.status; diff --git a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c index bb6fa14b3e..9bb5b2d309 100644 --- a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c +++ b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c @@ -24,11 +24,13 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask) ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements; size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype); size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype); + ucc_memory_type_t mtype; ucc_rank_t peer; size_t sd_disp, dd_disp, data_size; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + mtype = (TASK_ARGS(task).src.info_v.mem_type == UCC_MEMORY_TYPE_DPU) ? UCC_MEMORY_TYPE_HOST : TASK_ARGS(task).src.info_v.mem_type; /* perform a put to each member peer using the peer's index in the * destination displacement. */ for (peer = (grank + 1) % gsize; task->onesided.put_posted < gsize; @@ -46,7 +48,7 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask) UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp), PTR_OFFSET(dest, dd_disp), - data_size, peer, team, task), + data_size, peer, mtype, team, task), task, out); UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out); } diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 9418c179b8..26c52373d7 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -216,6 +216,9 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[]; #define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \ ((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg]) +#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \ + ((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg]) + /* #define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \ (PTR_OFFSET((_ctx)->dyn_seg.dyn_buff, (_ctx)->dyn_seg.seg_group_start[_seg] + (_ctx)->dyn_seg.seg_groups[_seg] * _rank)) @@ -261,6 +264,7 @@ static inline uint64_t UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ucc_tl_ucp_context_t *ctx, #if 1 int seg_group_id = ctx->dyn_seg.seg_groups[seg]; uint64_t *pkey_size = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 2 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); + //printf("pkey_size: %lu\n", *pkey_size); return *pkey_size;//[seg - ctx->dyn_seg.starting_seg[seg]]; #else return 0; @@ -276,6 +280,7 @@ static inline void * UCC_TL_UCP_REMOTE_DYN_KEY(ucc_tl_ucp_context_t *ctx, #if 1 int seg_group_id = ctx->dyn_seg.seg_groups[seg]; void *pkey = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset); +// printf("pkey %p for peer %d, calced offset: %lu, group_size %lu\n", pkey, rank, (3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset), ctx->dyn_seg.seg_group_size[seg_group_id]); return pkey; #else return 0; diff --git a/src/components/tl/ucp/tl_ucp_sendrecv.h b/src/components/tl/ucp/tl_ucp_sendrecv.h index 4d1862ec65..84c9f3751a 100644 --- a/src/components/tl/ucp/tl_ucp_sendrecv.h +++ b/src/components/tl/ucp/tl_ucp_sendrecv.h @@ -240,6 +240,7 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, u void *offset; ptrdiff_t base_offset; int i; + //ucc_rank_t rank = UCC_TL_TEAM_RANK(team); *segment = -1; core_rank = ucc_ep_map_eval(UCC_TL_TEAM_MAP(team), peer); @@ -283,27 +284,32 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, u uint64_t check_end = check_base + msglen; size_t num_keys = 0; void *packed_key = NULL; + size_t team_size = UCC_TL_TEAM_SIZE(team); +// printf("[%d] base: %lx end: %lx : check_base: %lx check_end: %lx\n", i, base, end, check_base, check_end); if (check_base >= base && check_base < end && - check_end < end) { + check_end <= end) { *segment = i; *rva = UCC_TL_UCP_REMOTE_DYN_RVA(ctx, peer, i); num_keys = *segment - ctx->dyn_seg.starting_seg[*segment]; for (int j = 0; j < num_keys; j++) { key_offset += UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ctx, peer, ctx->dyn_seg.starting_seg[*segment] + j); } +// printf("[%d] offset: %lu, *segment + n_segs: %lu\n", rank, key_offset, *segment + ctx->n_rinfo_segs); packed_key = UCC_TL_UCP_REMOTE_DYN_KEY(ctx, peer, key_offset, *segment); /* dynamic segment keys should be placed AFTER * the ctx's keys (i.e., num_static_segs + segment_number) */ - if (ucc_unlikely(NULL == UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs))) { + if (ucc_unlikely(NULL == UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment))) { +// printf("[%d] unpacking %p for peer %d\n", rank, packed_key, peer); ucs_status_t ucs_status = ucp_ep_rkey_unpack(*ep, packed_key, - &UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs)); + &UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment)); if (UCS_OK != ucs_status) { return ucs_status_to_ucc_status(ucs_status); } } - *rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment + ctx->n_rinfo_segs); + *rkey = UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment); +// printf("[%d] rkey: %p peer %d ep: %p\n", rank, *rkey, peer, *ep); *packed_memh = ctx->dynamic_remote_info[i].packed_memh; return UCC_OK; }