Skip to content

Commit

Permalink
REVIEW: fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev committed Nov 15, 2023
1 parent 03d0052 commit 11b9b9b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 58 deletions.
8 changes: 4 additions & 4 deletions src/components/tl/ucp/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ ucc_status_t ucc_tl_ucp_alltoall_init(ucc_tl_ucp_task_t *task)
}

ucc_status_t ucc_tl_ucp_alltoall_pairwise_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
Expand All @@ -72,8 +72,8 @@ ucc_status_t ucc_tl_ucp_alltoall_pairwise_init(ucc_base_coll_args_t *coll_args,
}

ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h)
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
Expand Down
95 changes: 41 additions & 54 deletions src/components/tl/ucp/alltoall/alltoall_bruck.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include "coll_patterns/bruck_alltoall.h"

#define RADIX 2
#define SAVE_STATE(_phase) \
do { \
task->alltoall_bruck.phase = _phase; \
} while (0)

enum {
PHASE_MERGE,
Expand Down Expand Up @@ -114,32 +118,23 @@ void ucc_tl_ucp_alltoall_bruck_progress(ucc_coll_task_t *coll_task)
const ucc_rank_t nrecv_segs = tsize / 2;
const size_t seg_size = ucc_dt_size(args->src.info.datatype) *
args->src.info.count / tsize;
void *data;
ucc_memory_type_t smtype = args->src.info.mem_type;
ucc_memory_type_t dmtype = args->dst.info.mem_type;
ucc_rank_t sendto, recvfrom, step, index;
void *data;
ucc_rank_t level, snd_count;
int send_buffer_index;
ucc_status_t st;
ucc_status_t status;
ucc_ee_executor_t *exec;
ucc_ee_executor_task_args_t eargs;

if (task->alltoall_bruck.etask != NULL) {
st = ucc_ee_executor_task_test(task->alltoall_bruck.etask);
if (st == UCC_OK) {
ucc_ee_executor_task_finalize(task->alltoall_bruck.etask);
task->alltoall_bruck.etask = NULL;
} else {
if (ucc_unlikely(st < 0)) {
task->super.status = st;
}
return;
}
}

if (task->alltoall_bruck.phase == PHASE_SENDRECV) {
EXEC_TASK_TEST(task->alltoall_bruck.phase,
"failed to copy data from user buffer to scratch",
task->alltoall_bruck.etask);
switch (task->alltoall_bruck.phase) {
case PHASE_SENDRECV:
goto ALLTOALL_BRUCK_PHASE_SENDRECV;
} else if (task->alltoall_bruck.phase == PHASE_BCOPY) {
case PHASE_BCOPY:
task->super.status = UCC_OK;
goto out;
}
Expand All @@ -161,11 +156,11 @@ void ucc_tl_ucp_alltoall_bruck_progress(ucc_coll_task_t *coll_task)
} else {
data = PTR_OFFSET(scratch, send_buffer_index * seg_size);
}
st = ucc_mc_memcpy(PTR_OFFSET(mergebuf, seg_size * snd_count),
data, seg_size, UCC_MEMORY_TYPE_HOST,
UCC_MEMORY_TYPE_HOST);
if (ucc_unlikely(UCC_OK != st)) {
task->super.status = st;
status = ucc_mc_memcpy(PTR_OFFSET(mergebuf, seg_size * snd_count),
data, seg_size, UCC_MEMORY_TYPE_HOST,
UCC_MEMORY_TYPE_HOST);
if (ucc_unlikely(UCC_OK != status)) {
task->super.status = status;
return;
}
snd_count++;
Expand All @@ -181,56 +176,48 @@ void ucc_tl_ucp_alltoall_bruck_progress(ucc_coll_task_t *coll_task)
task, out);
ALLTOALL_BRUCK_PHASE_SENDRECV:
if (ucc_tl_ucp_test(task) == UCC_INPROGRESS) {
task->alltoall_bruck.phase = PHASE_SENDRECV;
SAVE_STATE(PHASE_SENDRECV);
return;
}
task->alltoall_bruck.iteration++;
step = 1 << (task->alltoall_bruck.iteration - 1);
}

st = ucc_mc_memcpy(PTR_OFFSET(task->alltoall_bruck.dst, trank * seg_size),
PTR_OFFSET(task->alltoall_bruck.src, trank * seg_size),
seg_size, UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST);
if (ucc_unlikely(st != UCC_OK)) {
task->super.status = st;
status = ucc_mc_memcpy(PTR_OFFSET(task->alltoall_bruck.dst, trank * seg_size),
PTR_OFFSET(task->alltoall_bruck.src, trank * seg_size),
seg_size, UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return;
}
st = ucc_tl_ucp_alltoall_bruck_backward_rotation(task->alltoall_bruck.dst,
scratch, trank, tsize,
seg_size);
if (ucc_unlikely(st != UCC_OK)) {
task->super.status = st;
status = ucc_tl_ucp_alltoall_bruck_backward_rotation(mergebuf, scratch,
trank, tsize,
seg_size);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return;
}

if (smtype != UCC_MEMORY_TYPE_HOST || dmtype != UCC_MEMORY_TYPE_HOST) {
task->alltoall_bruck.phase = PHASE_BCOPY;
st = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(st != UCC_OK)) {
task->super.status = st;
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return;
}

eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.src = task->alltoall_bruck.dst;
eargs.copy.src = mergebuf;
eargs.copy.dst = args->dst.info.buffer;
eargs.copy.len = seg_size * tsize;
st = ucc_ee_executor_task_post(exec, &eargs,
&task->alltoall_bruck.etask);
if (ucc_unlikely(st != UCC_OK)) {
task->super.status = st;
return;
}
st = ucc_ee_executor_task_test(task->alltoall_bruck.etask);
if (st == UCC_OK) {
ucc_ee_executor_task_finalize(task->alltoall_bruck.etask);
task->alltoall_bruck.etask = NULL;
} else {
if (ucc_unlikely(st < 0)) {
task->super.status = st;
}
status = ucc_ee_executor_task_post(exec, &eargs,
&task->alltoall_bruck.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return;
}
EXEC_TASK_TEST(PHASE_BCOPY, "failed to copy data to user buffer",
task->alltoall_bruck.etask);
}

task->super.status = UCC_OK;
Expand Down Expand Up @@ -285,7 +272,7 @@ ucc_status_t ucc_tl_ucp_alltoall_bruck_init(ucc_base_coll_args_t *coll_args,
size_t ssize = ucc_dt_size(args->src.info.datatype) *
args->src.info.count;
size_t seg_size = ssize / tsize;
int bcopy = 0;
int is_bcopy = 0;
size_t scratch_size;
ucc_tl_ucp_task_t *task;
ucc_status_t status;
Expand All @@ -300,7 +287,7 @@ ucc_status_t ucc_tl_ucp_alltoall_bruck_init(ucc_base_coll_args_t *coll_args,
scratch_size = lognum(tsize) * ucc_div_round_up(tsize, 2) * seg_size;
if ((coll_args->args.src.info.mem_type != UCC_MEMORY_TYPE_HOST) ||
(coll_args->args.dst.info.mem_type != UCC_MEMORY_TYPE_HOST)) {
bcopy = 1;
is_bcopy = 1;
scratch_size += 2 * ssize;
}

Expand All @@ -312,7 +299,7 @@ ucc_status_t ucc_tl_ucp_alltoall_bruck_init(ucc_base_coll_args_t *coll_args,
return status;
}

if (bcopy) {
if (is_bcopy) {
task->alltoall_bruck.src =
PTR_OFFSET(task->alltoall_bruck.scratch_mc_header->addr,
lognum(tsize) * ucc_div_round_up(tsize, 2) * seg_size);
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void ucc_tl_ucp_team_default_score_str_free(
return; \
} \
ucc_ee_executor_task_finalize(_etask); \
_etask = NULL; \
if (ucc_unlikely(status < 0)) { \
tl_error(UCC_TASK_LIB(task), _errmsg); \
task->super.status = status; \
Expand Down

0 comments on commit 11b9b9b

Please sign in to comment.