Skip to content

Commit

Permalink
CORE: add progress throttle (#924)
Browse files Browse the repository at this point in the history
* CORE: add progress throttle

* TL/UCP: fix dbt progress

* REVIEW: fix review comments
  • Loading branch information
Sergei-Lebedev authored Feb 23, 2024
1 parent 31a75e5 commit c8fd6aa
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/components/tl/ucp/bcast/bcast_dbt.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task)
task->bcast_dbt.state = SEND_T1;

SEND_T1:
/* test_recv is needed to progress ucp_worker */
ucc_tl_ucp_test_recv(task);
if ((coll_root == rank) || (task->bcast_dbt.t1.recv > 0)) {
for (i = 0; i < 2; i++) {
if ((t1.children[i] != UCC_RANK_INVALID) &&
Expand All @@ -122,6 +124,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task)
task->bcast_dbt.state = SEND_T2;

SEND_T2:
/* test_recv is needed to progress ucp_worker */
ucc_tl_ucp_test_recv(task);
if ((coll_root == rank) || (task->bcast_dbt.t2.recv > 0)) {
for (i = 0; i < 2; i++) {
if ((t2.children[i] != UCC_RANK_INVALID) &&
Expand Down Expand Up @@ -231,6 +235,7 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_init(
task->super.post = ucc_tl_ucp_bcast_dbt_start;
task->super.progress = ucc_tl_ucp_bcast_dbt_progress;
task->super.finalize = ucc_tl_ucp_bcast_dbt_finalize;
task->n_polls = ucc_max(1, task->n_polls);
tl_team = TASK_TEAM(task);
rank = UCC_TL_TEAM_RANK(tl_team);
size = UCC_TL_TEAM_SIZE(tl_team);
Expand Down
4 changes: 4 additions & 0 deletions src/components/tl/ucp/reduce/reduce_dbt.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task)
task->reduce_dbt.state = REDUCE;

REDUCE:
/* test_recv is needed to progress ucp_worker */
ucc_tl_ucp_test_recv(task);
for (i = 0; i < 2; i++) {
if (trees[i].recv == trees[i].n_children &&
!task->reduce_dbt.reduction_comp[i]) {
Expand Down Expand Up @@ -216,6 +218,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task)
}

TEST_ROOT:
/* test_recv is needed to progress ucp_worker */
ucc_tl_ucp_test_recv(task);
if (UCC_INPROGRESS == ucc_tl_ucp_test_send(task) ||
task->reduce_dbt.reduction_comp[0] != trees[0].recv ||
task->reduce_dbt.reduction_comp[1] != trees[1].recv) {
Expand Down
33 changes: 26 additions & 7 deletions src/core/ucc_context.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -45,6 +45,11 @@ static ucc_config_field_t ucc_context_config_table[] = {
"is configured with OOB (global mode). 0 - disable, 1 - try, 2 - force.",
ucc_offsetof(ucc_context_config_t, internal_oob), UCC_CONFIG_TYPE_UINT},

{"THROTTLE_PROGRESS", "1000",
"Throttle UCC progress to every <n>th invocation",
ucc_offsetof(ucc_context_config_t, throttle_progress),
UCC_CONFIG_TYPE_UINT},

{NULL}};
UCC_CONFIG_REGISTER_TABLE(ucc_context_config_table, "UCC context", NULL,
ucc_context_config_t, &ucc_config_global_list);
Expand Down Expand Up @@ -614,9 +619,10 @@ ucc_status_t ucc_context_create_proc_info(ucc_lib_h lib,
status = UCC_ERR_NO_MEMORY;
goto error;
}
ctx->rank = UCC_RANK_MAX;
ctx->lib = lib;
ctx->ids.pool_size = config->team_ids_pool_size;
ctx->throttle_progress = config->throttle_progress;
ctx->rank = UCC_RANK_MAX;
ctx->lib = lib;
ctx->ids.pool_size = config->team_ids_pool_size;
ucc_list_head_init(&ctx->progress_list);
ucc_copy_context_params(&ctx->params, params);
ucc_copy_context_params(&b_params.params, params);
Expand Down Expand Up @@ -957,12 +963,25 @@ ucc_status_t ucc_context_progress_deregister(ucc_context_t *ctx,

ucc_status_t ucc_context_progress(ucc_context_h context)
{
static int call_num = 0;
ucc_status_t status;
ucc_context_progress_entry_t *entry;
/* progress registered progress fns */
ucc_list_for_each(entry, &context->progress_list, list_elem) {
entry->fn(entry->arg);
int is_empty;

is_empty = ucc_progress_queue_is_empty(context->pq);
if (ucc_likely(is_empty)) {
call_num--;
if (ucc_likely(call_num >= 0)) {
return UCC_OK;
}
/* progress registered progress fns */
ucc_list_for_each(entry, &context->progress_list, list_elem) {
entry->fn(entry->arg);
}
call_num = context->throttle_progress;
return UCC_OK;
}

/* the fn below returns int - number of completed tasks.
TODO : do we need to handle it ? Maybe return to user
as int as well? */
Expand Down
4 changes: 3 additions & 1 deletion src/core/ucc_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -77,6 +77,7 @@ typedef struct ucc_context {
ucc_context_topo_t *topo;
uint64_t cl_flags;
ucc_tl_team_t *service_team;
int32_t throttle_progress;
} ucc_context_t;

typedef struct ucc_context_config {
Expand All @@ -90,6 +91,7 @@ typedef struct ucc_context_config {
uint32_t estimated_num_ppn;
uint32_t lock_free_progress_q;
uint32_t internal_oob;
uint32_t throttle_progress;
} ucc_context_config_t;

/* Internal function for context creation that takes explicit
Expand Down
9 changes: 8 additions & 1 deletion src/core/ucc_progress_queue.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

Expand All @@ -14,6 +15,7 @@ struct ucc_progress_queue {
void (*enqueue)(ucc_progress_queue_t *pq, ucc_coll_task_t *task);
void (*dequeue)(ucc_progress_queue_t *pq, ucc_coll_task_t **task);
int (*progress)(ucc_progress_queue_t *pq);
int (*is_empty)(ucc_progress_queue_t *pq);
void (*finalize)(ucc_progress_queue_t *pq);
};

Expand Down Expand Up @@ -46,6 +48,11 @@ static inline int ucc_progress_queue(ucc_progress_queue_t *pq)
return pq->progress(pq);
}

static inline int ucc_progress_queue_is_empty(ucc_progress_queue_t *pq)
{
return pq->is_empty(pq);
}

void ucc_progress_queue_finalize(ucc_progress_queue_t *pq);

#endif
25 changes: 21 additions & 4 deletions src/core/ucc_progress_queue_mt.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

Expand All @@ -25,7 +26,7 @@ typedef struct ucc_pq_mt_locked {
} ucc_pq_mt_locked_t;

static void ucc_pq_locked_mt_enqueue(ucc_progress_queue_t *pq,
ucc_coll_task_t * task)
ucc_coll_task_t *task)
{
ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t);

Expand All @@ -42,7 +43,7 @@ static void ucc_pq_mt_enqueue(ucc_progress_queue_t *pq, ucc_coll_task_t *task)
}

static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq,
ucc_coll_task_t ** popped_task)
ucc_coll_task_t **popped_task)
{
ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t);
*popped_task = NULL;
Expand All @@ -56,7 +57,7 @@ static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq,
}

static void ucc_pq_mt_dequeue(ucc_progress_queue_t *pq,
ucc_coll_task_t ** popped_task)
ucc_coll_task_t **popped_task)
{
ucc_pq_mt_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_t);
ucc_lf_queue_elem_t *elem = ucc_lf_queue_dequeue(&pq_mt->lf_queue, 1);
Expand Down Expand Up @@ -100,6 +101,20 @@ static int ucc_pq_mt_progress(ucc_progress_queue_t *pq)
return n_progressed;
}

static int ucc_pq_locked_mt_is_empty(ucc_progress_queue_t *pq)
{
ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t);

/* this function should not be very accurate for the purpose of progress throttling */
return ucc_list_is_empty(&pq_mt->queue);
}

static int ucc_pq_mt_is_empty(ucc_progress_queue_t *pq) //NOLINT: pq is unused
{
/* lock free progress queue never use throttling */
return 0;
}

static void ucc_pq_locked_mt_finalize(ucc_progress_queue_t *pq)
{
ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t);
Expand Down Expand Up @@ -128,6 +143,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq,
pq_mt->super.dequeue = ucc_pq_mt_dequeue;
pq_mt->super.progress = ucc_pq_mt_progress;
pq_mt->super.finalize = ucc_pq_mt_finalize;
pq_mt->super.is_empty = ucc_pq_mt_is_empty;
*pq = &pq_mt->super;
} else {
ucc_pq_mt_locked_t *pq_mt = ucc_malloc(sizeof(*pq_mt), "pq_mt");
Expand All @@ -141,6 +157,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq,
pq_mt->super.dequeue = ucc_pq_locked_mt_dequeue;
pq_mt->super.progress = ucc_pq_mt_progress;
pq_mt->super.finalize = ucc_pq_locked_mt_finalize;
pq_mt->super.is_empty = ucc_pq_locked_mt_is_empty;
*pq = &pq_mt->super;
}
return UCC_OK;
Expand Down
12 changes: 11 additions & 1 deletion src/core/ucc_progress_queue_st.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* Copyright (c) 2020, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

Expand Down Expand Up @@ -67,6 +68,13 @@ static void ucc_pq_st_finalize(ucc_progress_queue_t *pq)
ucc_free(pq_st);
}

static int ucc_pq_st_is_empty(ucc_progress_queue_t *pq)
{
ucc_pq_st_t *pq_st = ucc_derived_of(pq, ucc_pq_st_t);

return ucc_list_is_empty(&pq_st->list);
}

ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq)
{
ucc_pq_st_t *pq_st = ucc_malloc(sizeof(*pq_st), "pq_st");
Expand All @@ -79,6 +87,8 @@ ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq)
pq_st->super.dequeue = NULL;
pq_st->super.progress = ucc_pq_st_progress;
pq_st->super.finalize = ucc_pq_st_finalize;
pq_st->super.is_empty = ucc_pq_st_is_empty;

*pq = &pq_st->super;
return UCC_OK;
}

0 comments on commit c8fd6aa

Please sign in to comment.