Skip to content

Commit

Permalink
TL/CUDA: refactor bar init
Browse files Browse the repository at this point in the history
  • Loading branch information
ikryukov committed Nov 18, 2024
1 parent 8d60d00 commit fe44c02
Showing 1 changed file with 98 additions and 56 deletions.
154 changes: 98 additions & 56 deletions src/components/tl/cuda/bcast/bcast_linear.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,76 @@ static inline ucc_status_t ecopy(void *dst, void *src, size_t size,
return ucc_ee_executor_task_post(exec, &exec_args, etask);
}

static inline ucc_status_t root_find_free_barrier(ucc_tl_cuda_task_t *task)
{
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent;
bool found = false;
ucc_tl_cuda_shm_barrier_t *curr_bar;
int i;
ucc_status_t st;

/* search first free barrier in active set pool */
for (i = 0; i < max_concurrent; ++i) {
curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i);
// try to set user specified tag to mark that this barrier is used by this task
if (ucc_atomic_cswap64(&curr_bar->tag, UCC_TAG_FREE,
task->bcast_linear.key) == UCC_TAG_FREE) {
ucc_print("found free barrier: %d marked with tag: %ld", i,
curr_bar->tag);
// free
task->bar = curr_bar;
st = ucc_tl_cuda_shm_barrier_init_root(
task->subset.map.ep_num, task->subset.myrank,
task->bcast_linear.root, task->bar);
if (ucc_unlikely(st != UCC_OK)) {
ucc_error("failed to init root barrier");
return UCC_ERR_NO_RESOURCE;
}
found = true;
task->coll_id = i + max_concurrent;
break;
}
}
if (!found) {
// try next time
return UCC_ERR_NOT_FOUND;
}
return UCC_OK;
}

static inline ucc_status_t peer_find_free_barrier(ucc_tl_cuda_task_t *task)
{
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent;
bool found = false;
ucc_tl_cuda_shm_barrier_t *curr_bar;
int i;
ucc_status_t st;
for (i = 0; i < max_concurrent; ++i) {
curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i);
if (curr_bar->tag == task->bcast_linear.key) {
task->bar = curr_bar;
st = ucc_tl_cuda_shm_barrier_init_root(
task->subset.map.ep_num, task->subset.myrank,
task->bcast_linear.root, task->bar);
if (ucc_unlikely(st != UCC_OK)) {
ucc_error("failed to init peer barrier");
return UCC_ERR_NO_RESOURCE;
}
found = true;
task->coll_id = i + max_concurrent;
task->bcast_linear.stage = STAGE_SYNC;
break;
}
}
if (!found) {
// try next time
return UCC_ERR_NOT_FOUND;
}
return UCC_OK;
}

ucc_status_t ucc_tl_cuda_bcast_linear_finalize(ucc_coll_task_t *coll_task)
{
ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t);
Expand All @@ -81,7 +151,6 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task)
ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t);
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
ucc_rank_t trank = UCC_TL_TEAM_RANK(team);
uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent;
size_t half_scratch_size = get_raw_scratch_size(team) / 2;
ucc_rank_t tsize = (ucc_rank_t)task->subset.map.ep_num;
size_t chunk_size =
Expand All @@ -96,9 +165,6 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task)
void *sbuf, *dbuf;
int i;
ucc_rank_t peer;
ucc_tl_cuda_shm_barrier_t *curr_bar;
bool found;


task->super.status = UCC_INPROGRESS;

Expand All @@ -110,61 +176,32 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task)

switch (task->bcast_linear.stage) {
case STAGE_INIT_BAR_ROOT:
if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) {
found = false;
/* search first free barrier in active set pool */
for (i = 0; i < max_concurrent; ++i) {
curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i);
if (ucc_atomic_cswap64(&curr_bar->tag, UCC_TAG_FREE, task->bcast_linear.key) == UCC_TAG_FREE) {
ucc_print("found free barrier: %d marked with tag: %ld", i, curr_bar->tag);
// free
task->bar = curr_bar;
// set user specified tag to mark that this barrier is used by this task
task->bar->tag = task->bcast_linear.key;
st = ucc_tl_cuda_shm_barrier_init_root(task->subset.map.ep_num, task->subset.myrank, task->bcast_linear.root, task->bar);
if (ucc_unlikely(st != UCC_OK)) {
ucc_error("failed to init root barrier");
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
found = true;
task->coll_id = i + max_concurrent;
break;
}
}
if (!found)
{
// try next time
return;
}
st = root_find_free_barrier(task);
if (st == UCC_ERR_NOT_FOUND) {
// no free barriers found, try next time
return;
}
if (st == UCC_OK) {
task->bcast_linear.stage = STAGE_SYNC;
break; // TODO: move all logic to separate functions
break;
} else {
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
break;
case STAGE_FIND_BAR_PEER:
found = false;
for (i = 0; i < max_concurrent; ++i) {
curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i);
if (curr_bar->tag == task->bcast_linear.key) {
task->bar = curr_bar;
// TODO: pass root rank???
st = ucc_tl_cuda_shm_barrier_init_root(
task->subset.map.ep_num, task->subset.myrank,
task->bcast_linear.root, task->bar);
if (ucc_unlikely(st != UCC_OK)) {
ucc_error("failed to init peer barrier");
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
found = true;
task->coll_id = i + max_concurrent;
task->bcast_linear.stage = STAGE_SYNC;
break;
}
st = peer_find_free_barrier(task);
if (st == UCC_ERR_NOT_FOUND) {
// no free barriers found, wait for root
return;
}
if (!found)
{
// try next time;
return;
if (st == UCC_OK) {
// barrier found, continue to next stages
task->bcast_linear.stage = STAGE_SYNC;
break;
} else {
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
case STAGE_SYNC:
if (ucc_tl_cuda_get_sync_root(task, task->bcast_linear.root) != UCC_OK) {
Expand Down Expand Up @@ -319,8 +356,13 @@ ucc_status_t ucc_tl_cuda_bcast_linear_start(ucc_coll_task_t *coll_task)
ucc_datatype_t dt = task->bcast_linear.dt;
size_t half_scratch_size = get_raw_scratch_size(team) / 2;

task->bcast_linear.stage = UCC_TL_TEAM_RANK(team) == task->bcast_linear.root ? STAGE_INIT_BAR_ROOT : STAGE_FIND_BAR_PEER;
task->bcast_linear.stage = STAGE_SYNC;

// in case of active set bcast we need to do additional steps to find free barriers
if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) {
task->bcast_linear.stage = UCC_TL_TEAM_RANK(team) == task->bcast_linear.root ? STAGE_INIT_BAR_ROOT : STAGE_FIND_BAR_PEER;
}

task->bcast_linear.size = ucc_dt_size(dt) * args->src.info.count;
task->bcast_linear.num_steps =
ucc_div_round_up(task->bcast_linear.size, half_scratch_size);
Expand Down

0 comments on commit fe44c02

Please sign in to comment.