diff --git a/src/components/tl/cuda/bcast/bcast_linear.c b/src/components/tl/cuda/bcast/bcast_linear.c index d9958ba00d..7b98674316 100644 --- a/src/components/tl/cuda/bcast/bcast_linear.c +++ b/src/components/tl/cuda/bcast/bcast_linear.c @@ -67,6 +67,76 @@ static inline ucc_status_t ecopy(void *dst, void *src, size_t size, return ucc_ee_executor_task_post(exec, &exec_args, etask); } +static inline ucc_status_t root_find_free_barrier(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent; + bool found = false; + ucc_tl_cuda_shm_barrier_t *curr_bar; + int i; + ucc_status_t st; + + /* search first free barrier in active set pool */ + for (i = 0; i < max_concurrent; ++i) { + curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); + // try to set user specified tag to mark that this barrier is used by this task + if (ucc_atomic_cswap64(&curr_bar->tag, UCC_TAG_FREE, + task->bcast_linear.key) == UCC_TAG_FREE) { + ucc_print("found free barrier: %d marked with tag: %ld", i, + curr_bar->tag); + // free + task->bar = curr_bar; + st = ucc_tl_cuda_shm_barrier_init_root( + task->subset.map.ep_num, task->subset.myrank, + task->bcast_linear.root, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to init root barrier"); + return UCC_ERR_NO_RESOURCE; + } + found = true; + task->coll_id = i + max_concurrent; + break; + } + } + if (!found) { + // try next time + return UCC_ERR_NOT_FOUND; + } + return UCC_OK; +} + +static inline ucc_status_t peer_find_free_barrier(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent; + bool found = false; + ucc_tl_cuda_shm_barrier_t *curr_bar; + int i; + ucc_status_t st; + for (i = 0; i < max_concurrent; ++i) { + curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); + if (curr_bar->tag == task->bcast_linear.key) { + task->bar = curr_bar; + st = ucc_tl_cuda_shm_barrier_init_root( + task->subset.map.ep_num, task->subset.myrank, + task->bcast_linear.root, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to init peer barrier"); + return UCC_ERR_NO_RESOURCE; + } + found = true; + task->coll_id = i + max_concurrent; + task->bcast_linear.stage = STAGE_SYNC; + break; + } + } + if (!found) { + // try next time + return UCC_ERR_NOT_FOUND; + } + return UCC_OK; +} + ucc_status_t ucc_tl_cuda_bcast_linear_finalize(ucc_coll_task_t *coll_task) { ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); @@ -81,7 +151,6 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task) ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); ucc_tl_cuda_team_t *team = TASK_TEAM(task); ucc_rank_t trank = UCC_TL_TEAM_RANK(team); - uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent; size_t half_scratch_size = get_raw_scratch_size(team) / 2; ucc_rank_t tsize = (ucc_rank_t)task->subset.map.ep_num; size_t chunk_size = @@ -96,9 +165,6 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task) void *sbuf, *dbuf; int i; ucc_rank_t peer; - ucc_tl_cuda_shm_barrier_t *curr_bar; - bool found; - task->super.status = UCC_INPROGRESS; @@ -110,61 +176,32 @@ void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task) switch (task->bcast_linear.stage) { case STAGE_INIT_BAR_ROOT: - if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) { - found = false; - /* search first free barrier in active set pool */ - for (i = 0; i < max_concurrent; ++i) { - curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); - if (ucc_atomic_cswap64(&curr_bar->tag, UCC_TAG_FREE, task->bcast_linear.key) == UCC_TAG_FREE) { - ucc_print("found free barrier: %d marked with tag: %ld", i, curr_bar->tag); - // free - task->bar = curr_bar; - // set user specified tag to mark that this barrier is used by this task - task->bar->tag = task->bcast_linear.key; - st = ucc_tl_cuda_shm_barrier_init_root(task->subset.map.ep_num, task->subset.myrank, task->bcast_linear.root, task->bar); - if (ucc_unlikely(st != UCC_OK)) { - ucc_error("failed to init root barrier"); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - found = true; - task->coll_id = i + max_concurrent; - break; - } - } - if (!found) - { - // try next time - return; - } + st = root_find_free_barrier(task); + if (st == UCC_ERR_NOT_FOUND) { + // no free barriers found, try next time + return; + } + if (st == UCC_OK) { task->bcast_linear.stage = STAGE_SYNC; - break; // TODO: move all logic to separate functions + break; + } else { + task->super.status = UCC_ERR_NO_RESOURCE; + return; } + break; case STAGE_FIND_BAR_PEER: - found = false; - for (i = 0; i < max_concurrent; ++i) { - curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); - if (curr_bar->tag == task->bcast_linear.key) { - task->bar = curr_bar; - // TODO: pass root rank??? - st = ucc_tl_cuda_shm_barrier_init_root( - task->subset.map.ep_num, task->subset.myrank, - task->bcast_linear.root, task->bar); - if (ucc_unlikely(st != UCC_OK)) { - ucc_error("failed to init peer barrier"); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - found = true; - task->coll_id = i + max_concurrent; - task->bcast_linear.stage = STAGE_SYNC; - break; - } + st = peer_find_free_barrier(task); + if (st == UCC_ERR_NOT_FOUND) { + // no free barriers found, wait for root + return; } - if (!found) - { - // try next time; - return; + if (st == UCC_OK) { + // barrier found, continue to next stages + task->bcast_linear.stage = STAGE_SYNC; + break; + } else { + task->super.status = UCC_ERR_NO_RESOURCE; + return; } case STAGE_SYNC: if (ucc_tl_cuda_get_sync_root(task, task->bcast_linear.root) != UCC_OK) { @@ -319,8 +356,13 @@ ucc_status_t ucc_tl_cuda_bcast_linear_start(ucc_coll_task_t *coll_task) ucc_datatype_t dt = task->bcast_linear.dt; size_t half_scratch_size = get_raw_scratch_size(team) / 2; - task->bcast_linear.stage = UCC_TL_TEAM_RANK(team) == task->bcast_linear.root ? STAGE_INIT_BAR_ROOT : STAGE_FIND_BAR_PEER; + task->bcast_linear.stage = STAGE_SYNC; + // in case of active set bcast we need to do additional steps to find free barriers + if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) { + task->bcast_linear.stage = UCC_TL_TEAM_RANK(team) == task->bcast_linear.root ? STAGE_INIT_BAR_ROOT : STAGE_FIND_BAR_PEER; + } + task->bcast_linear.size = ucc_dt_size(dt) * args->src.info.count; task->bcast_linear.num_steps = ucc_div_round_up(task->bcast_linear.size, half_scratch_size);