Skip to content

Commit

Permalink
TL/UCP: add onesided alltoallv
Browse files Browse the repository at this point in the history
TEST: add onesided alltoallv mpi test

REVIEW: address feedback

REVIEW: address feedback

REVIEW: address feedback

REVIEW: address feedback

REVIEW: address feedback
  • Loading branch information
ferrol aderholdt committed Nov 1, 2023
1 parent 483b91b commit 551f03c
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ alltoallv = \
alltoallv/alltoallv.h \
alltoallv/alltoallv.c \
alltoallv/alltoallv_pairwise.c \
alltoallv/alltoallv_hybrid.c
alltoallv/alltoallv_hybrid.c \
alltoallv/alltoallv_onesided.c

allreduce = \
allreduce/allreduce.h \
Expand Down
4 changes: 4 additions & 0 deletions src/components/tl/ucp/alltoallv/alltoallv.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ ucc_base_coll_alg_info_t
{.id = UCC_TL_UCP_ALLTOALLV_ALG_HYBRID,
.name = "hybrid",
.desc = "hybrid a2av alg "},
[UCC_TL_UCP_ALLTOALLV_ALG_ONESIDED] =
{.id = UCC_TL_UCP_ALLTOALLV_ALG_ONESIDED,
.name = "onesided",
.desc = "O(N) onesided alltoallv"},
[UCC_TL_UCP_ALLTOALLV_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand Down
4 changes: 4 additions & 0 deletions src/components/tl/ucp/alltoallv/alltoallv.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
enum {
UCC_TL_UCP_ALLTOALLV_ALG_PAIRWISE,
UCC_TL_UCP_ALLTOALLV_ALG_HYBRID,
UCC_TL_UCP_ALLTOALLV_ALG_ONESIDED,
UCC_TL_UCP_ALLTOALLV_ALG_LAST
};

Expand All @@ -32,6 +33,9 @@ ucc_status_t ucc_tl_ucp_alltoallv_hybrid_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t ucc_tl_ucp_alltoallv_pairwise_init_common(ucc_tl_ucp_task_t *task);

Expand Down
104 changes: 104 additions & 0 deletions src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "config.h"
#include "tl_ucp.h"
#include "alltoallv.h"
#include "core/ucc_progress_queue.h"
#include "utils/ucc_math.h"
#include "tl_ucp_sendrecv.h"

ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ptrdiff_t src = (ptrdiff_t)TASK_ARGS(task).src.info_v.buffer;
ptrdiff_t dest = (ptrdiff_t)TASK_ARGS(task).dst.info_v.buffer;
ucc_rank_t grank = UCC_TL_TEAM_RANK(team);
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
long *pSync = TASK_ARGS(task).global_work_buffer;
ucc_aint_t *s_disp = TASK_ARGS(task).src.info_v.displacements;
ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements;
size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype);
size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype);
ucc_rank_t peer;
size_t sd_disp, dd_disp, data_size;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

/* perform a put to each member peer using the peer's index in the
* destination displacement. */
for (peer = (grank + 1) % gsize; task->onesided.put_posted < gsize;
peer = (peer + 1) % gsize) {
sd_disp =
ucc_coll_args_get_displacement(&TASK_ARGS(task), s_disp, peer) *
sdt_size;
dd_disp =
ucc_coll_args_get_displacement(&TASK_ARGS(task), d_disp, peer) *
rdt_size;
data_size =
ucc_coll_args_get_count(&TASK_ARGS(task),
TASK_ARGS(task).src.info_v.counts, peer) *
sdt_size;

UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
PTR_OFFSET(dest, dd_disp),
data_size, peer, team, task),
task, out);
UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out);
}
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
long *pSync = TASK_ARGS(task).global_work_buffer;

if (ucc_tl_ucp_test_onesided(task, gsize) == UCC_INPROGRESS) {
return;
}

pSync[0] = 0;
task->super.status = UCC_OK;
}

ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
ucc_status_t status;

ALLTOALLV_TASK_CHECK(coll_args->args, tl_team);
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"global work buffer not provided nor associated with team");
status = UCC_ERR_NOT_SUPPORTED;
goto out;
}
if (coll_args->args.mask & UCC_COLL_ARGS_FIELD_FLAGS) {
if (!(coll_args->args.flags & UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"non memory mapped buffers are not supported");
status = UCC_ERR_NOT_SUPPORTED;
goto out;
}
}

task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoallv_onesided_start;
task->super.progress = ucc_tl_ucp_alltoallv_onesided_progress;
status = UCC_OK;
out:
return status;
}
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
case UCC_TL_UCP_ALLTOALLV_ALG_HYBRID:
*init = ucc_tl_ucp_alltoallv_hybrid_init;
break;
case UCC_TL_UCP_ALLTOALLV_ALG_ONESIDED:
*init = ucc_tl_ucp_alltoallv_onesided_init;
break;
default:
status = UCC_ERR_INVALID_PARAM;
break;
Expand Down
26 changes: 26 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,32 @@ static inline ucc_status_t ucc_tl_ucp_test_ring(ucc_tl_ucp_task_t *task)
return UCC_INPROGRESS;
}

#define UCC_TL_UCP_TASK_ONESIDED_P2P_COMPLETE(_task) \
(((_task)->onesided.put_posted == (_task)->onesided.put_completed) && \
((_task)->onesided.get_posted == (_task)->onesided.get_completed))

#define UCC_TL_UCP_TASK_ONESIDED_SYNC_COMPLETE(_task, _end) \
(*((long *)(TASK_ARGS(_task).global_work_buffer)) == _end)

static inline ucc_status_t ucc_tl_ucp_test_onesided(ucc_tl_ucp_task_t *task,
int sync_end)
{
int polls = 0;

if (UCC_TL_UCP_TASK_ONESIDED_P2P_COMPLETE(task) &&
UCC_TL_UCP_TASK_ONESIDED_SYNC_COMPLETE(task, sync_end)) {
return UCC_OK;
}
while (polls++ < task->n_polls) {
if (UCC_TL_UCP_TASK_ONESIDED_P2P_COMPLETE(task) &&
UCC_TL_UCP_TASK_ONESIDED_SYNC_COMPLETE(task, sync_end)) {
return UCC_OK;
}
ucp_worker_progress(UCC_TL_UCP_TASK_TEAM(task)->worker->ucp_worker);
}
return UCC_INPROGRESS;
}

ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
ucc_coll_type_t coll_type,
ucc_memory_type_t mem_type,
Expand Down
12 changes: 7 additions & 5 deletions src/components/tl/ucp/tl_ucp_sendrecv.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) Meta Platforms, Inc. and affiliates. 2022.
*
* See file LICENSE for terms.
Expand Down Expand Up @@ -254,16 +254,18 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, ucp_ep_h *ep,
keys = PTR_OFFSET(base_offset, (section_offset * 3));

for (int i = 0; i < ctx->n_rinfo_segs; i++) {
if ((uint64_t)va >= (uint64_t)team->va_base[i] &&
(uint64_t)va < (uint64_t)team->va_base[i] + team->base_length[i]) {
uint64_t base = (uint64_t)team->va_base[i];
uint64_t end = base + team->base_length[i];
if ((uint64_t)va >= base &&
(uint64_t)va < end) {
*segment = i;
break;
}
key_offset += key_sizes[i];
}
if (0 > *segment) {
if (ucc_unlikely(0 > *segment)) {
tl_error(UCC_TL_TEAM_LIB(team),
"attempt to perform one-sided operation on non-registered memory");
"attempt to perform one-sided operation on non-registered memory %p", va);
return UCC_ERR_NOT_FOUND;
}
if (ucc_unlikely(NULL == UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment))) {
Expand Down
2 changes: 1 addition & 1 deletion test/mpi/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static std::vector<ucc_coll_type_t> colls = {
UCC_COLL_TYPE_SCATTER, UCC_COLL_TYPE_SCATTERV};

static std::vector<ucc_coll_type_t> onesided_colls = {
UCC_COLL_TYPE_ALLTOALL};
UCC_COLL_TYPE_ALLTOALL, UCC_COLL_TYPE_ALLTOALLV};

static std::vector<ucc_memory_type_t> mtypes = {
UCC_MEMORY_TYPE_HOST};
Expand Down
80 changes: 59 additions & 21 deletions test/mpi/test_alltoallv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,26 @@ TestAlltoallv::TestAlltoallv(ucc_test_team_t &_team, TestCaseParams &params) :
std::default_random_engine eng;
size_t dt_size, count;
int rank, nprocs, rank_count;

dt = params.dt;
dt_size = ucc_dt_size(dt);
count = msgsize / dt_size;
sncounts = 0;
rncounts = 0;
scounts = NULL;
sdispls = NULL;
rcounts = NULL;
rdispls = NULL;
scounts64 = NULL;
sdispls64 = NULL;
rcounts64 = NULL;
rdispls64 = NULL;
count_bits = params.count_bits;
displ_bits = params.displ_bits;
bool is_onesided;
void *work_buf;

dt = params.dt;
dt_size = ucc_dt_size(dt);
count = msgsize / dt_size;
sncounts = 0;
rncounts = 0;
scounts = NULL;
sdispls = NULL;
rcounts = NULL;
rdispls = NULL;
scounts64 = NULL;
sdispls64 = NULL;
rcounts64 = NULL;
rdispls64 = NULL;
count_bits = params.count_bits;
displ_bits = params.displ_bits;
is_onesided = (params.buffers != NULL);
work_buf = NULL;

std::uniform_int_distribution<int> urd(count / 2, count);
eng.seed(test_rand_seed);
Expand All @@ -56,6 +60,10 @@ TestAlltoallv::TestAlltoallv(ucc_test_team_t &_team, TestCaseParams &params) :
args.mask = UCC_COLL_ARGS_FIELD_FLAGS;
args.flags |= UCC_COLL_ARGS_FLAG_CONTIG_SRC_BUFFER |
UCC_COLL_ARGS_FLAG_CONTIG_DST_BUFFER;
if (is_onesided) {
args.mask |= UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER;
args.flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS;
}
if (count_bits == TEST_FLAG_VSIZE_64BIT) {
args.flags |= UCC_COLL_ARGS_FLAG_COUNT_64BIT;
}
Expand Down Expand Up @@ -92,14 +100,21 @@ TestAlltoallv::TestAlltoallv(ucc_test_team_t &_team, TestCaseParams &params) :
if (TEST_SKIP_NONE != skip_reduce(test_skip, team.comm)) {
return;
}

UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, sncounts * dt_size, mem_type));
UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, rncounts * dt_size, mem_type));
sbuf = sbuf_mc_header->addr;
rbuf = rbuf_mc_header->addr;
check_buf = ucc_malloc((sncounts + rncounts) * dt_size, "check buf");
UCC_MALLOC_CHECK(check_buf);

if (!is_onesided) {
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, sncounts * dt_size, mem_type));
UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, rncounts * dt_size, mem_type));
sbuf = sbuf_mc_header->addr;
rbuf = rbuf_mc_header->addr;
} else {
sbuf = params.buffers[MEM_SEND_SEGMENT];
rbuf = params.buffers[MEM_RECV_SEGMENT];
work_buf = params.buffers[MEM_WORK_SEGMENT];
args.global_work_buffer = work_buf;
}

args.src.info_v.buffer = sbuf;
args.src.info_v.datatype = dt;
args.src.info_v.mem_type = mem_type;
Expand Down Expand Up @@ -140,6 +155,29 @@ TestAlltoallv::TestAlltoallv(ucc_test_team_t &_team, TestCaseParams &params) :
args.src.info_v.displacements = (ucc_aint_t*)sdispls;
args.dst.info_v.displacements = (ucc_aint_t*)rdispls;
}
if (is_onesided) {
MPI_Datatype datatype;
size_t disp_size;
void *ldisp;
int alltoall_status;

if (TEST_FLAG_VSIZE_64BIT == displ_bits) {
datatype = MPI_LONG;
disp_size = sizeof(uint64_t);
} else {
datatype = MPI_INT;
disp_size = sizeof(uint32_t);
}
ldisp = ucc_calloc(nprocs, disp_size, "displacements");
UCC_MALLOC_CHECK(ldisp);
alltoall_status = MPI_Alltoall(args.dst.info_v.displacements, 1,
datatype, ldisp, 1, datatype, team.comm);
if (MPI_SUCCESS != alltoall_status) {
std::cerr << "*** MPI ALLTOALL FAILED" << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
}
args.dst.info_v.displacements = (ucc_aint_t *)ldisp;
}
UCC_CHECK(set_input());
UCC_CHECK_SKIP(ucc_collective_init(&args, &req, team.team), test_skip);
}
Expand Down
6 changes: 4 additions & 2 deletions test/mpi/test_mpi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ UccTestMpi::UccTestMpi(int argc, char *argv[], ucc_thread_mode_t _tm,
ucc_context_config_release(ctx_config);
if (with_onesided) {
prev_env = getenv("UCC_TL_UCP_TUNE");
setenv("UCC_TL_UCP_TUNE", "alltoall:0-inf:@onesided", 1);
setenv("UCC_TL_UCP_TUNE", "alltoall:0-inf:@onesided#alltoallv:0-inf:@onesided", 1);
UCC_CHECK(ucc_lib_config_read(NULL, NULL, &lib_config));
UCC_CHECK(ucc_init(&lib_params, lib_config, &onesided_lib));
ucc_lib_config_release(lib_config);
Expand Down Expand Up @@ -590,7 +590,9 @@ void UccTestMpi::run_all_at_team(ucc_test_team_t &team,
continue;
}

if (c == UCC_COLL_TYPE_ALLTOALL && team.ctx != ctx) {
if ((c == UCC_COLL_TYPE_ALLTOALL ||
c == UCC_COLL_TYPE_ALLTOALLV) &&
team.ctx != ctx) {
/* onesided alltoall */
if (mt != UCC_MEMORY_TYPE_HOST) {
continue;
Expand Down

0 comments on commit 551f03c

Please sign in to comment.