From c8fd6aa4491fa1294d3b40650e0d1e36ef680f41 Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Fri, 23 Feb 2024 08:08:39 +0100 Subject: [PATCH] CORE: add progress throttle (#924) * CORE: add progress throttle * TL/UCP: fix dbt progress * REVIEW: fix review comments --- src/components/tl/ucp/bcast/bcast_dbt.c | 5 ++++ src/components/tl/ucp/reduce/reduce_dbt.c | 4 +++ src/core/ucc_context.c | 33 ++++++++++++++++++----- src/core/ucc_context.h | 4 ++- src/core/ucc_progress_queue.h | 9 ++++++- src/core/ucc_progress_queue_mt.c | 25 ++++++++++++++--- src/core/ucc_progress_queue_st.c | 12 ++++++++- 7 files changed, 78 insertions(+), 14 deletions(-) diff --git a/src/components/tl/ucp/bcast/bcast_dbt.c b/src/components/tl/ucp/bcast/bcast_dbt.c index 4e1f77594f..36394edc57 100644 --- a/src/components/tl/ucp/bcast/bcast_dbt.c +++ b/src/components/tl/ucp/bcast/bcast_dbt.c @@ -107,6 +107,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) task->bcast_dbt.state = SEND_T1; SEND_T1: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if ((coll_root == rank) || (task->bcast_dbt.t1.recv > 0)) { for (i = 0; i < 2; i++) { if ((t1.children[i] != UCC_RANK_INVALID) && @@ -122,6 +124,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) task->bcast_dbt.state = SEND_T2; SEND_T2: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if ((coll_root == rank) || (task->bcast_dbt.t2.recv > 0)) { for (i = 0; i < 2; i++) { if ((t2.children[i] != UCC_RANK_INVALID) && @@ -231,6 +235,7 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_init( task->super.post = ucc_tl_ucp_bcast_dbt_start; task->super.progress = ucc_tl_ucp_bcast_dbt_progress; task->super.finalize = ucc_tl_ucp_bcast_dbt_finalize; + task->n_polls = ucc_max(1, task->n_polls); tl_team = TASK_TEAM(task); rank = UCC_TL_TEAM_RANK(tl_team); size = UCC_TL_TEAM_SIZE(tl_team); diff --git a/src/components/tl/ucp/reduce/reduce_dbt.c b/src/components/tl/ucp/reduce/reduce_dbt.c index 08e8774974..08b8a7aed5 100644 --- a/src/components/tl/ucp/reduce/reduce_dbt.c +++ b/src/components/tl/ucp/reduce/reduce_dbt.c @@ -155,6 +155,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task) task->reduce_dbt.state = REDUCE; REDUCE: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); for (i = 0; i < 2; i++) { if (trees[i].recv == trees[i].n_children && !task->reduce_dbt.reduction_comp[i]) { @@ -216,6 +218,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task) } TEST_ROOT: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if (UCC_INPROGRESS == ucc_tl_ucp_test_send(task) || task->reduce_dbt.reduction_comp[0] != trees[0].recv || task->reduce_dbt.reduction_comp[1] != trees[1].recv) { diff --git a/src/core/ucc_context.c b/src/core/ucc_context.c index 7c5fd3c3ca..4d4f90f160 100644 --- a/src/core/ucc_context.c +++ b/src/core/ucc_context.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -45,6 +45,11 @@ static ucc_config_field_t ucc_context_config_table[] = { "is configured with OOB (global mode). 0 - disable, 1 - try, 2 - force.", ucc_offsetof(ucc_context_config_t, internal_oob), UCC_CONFIG_TYPE_UINT}, + {"THROTTLE_PROGRESS", "1000", + "Throttle UCC progress to every th invocation", + ucc_offsetof(ucc_context_config_t, throttle_progress), + UCC_CONFIG_TYPE_UINT}, + {NULL}}; UCC_CONFIG_REGISTER_TABLE(ucc_context_config_table, "UCC context", NULL, ucc_context_config_t, &ucc_config_global_list); @@ -614,9 +619,10 @@ ucc_status_t ucc_context_create_proc_info(ucc_lib_h lib, status = UCC_ERR_NO_MEMORY; goto error; } - ctx->rank = UCC_RANK_MAX; - ctx->lib = lib; - ctx->ids.pool_size = config->team_ids_pool_size; + ctx->throttle_progress = config->throttle_progress; + ctx->rank = UCC_RANK_MAX; + ctx->lib = lib; + ctx->ids.pool_size = config->team_ids_pool_size; ucc_list_head_init(&ctx->progress_list); ucc_copy_context_params(&ctx->params, params); ucc_copy_context_params(&b_params.params, params); @@ -957,12 +963,25 @@ ucc_status_t ucc_context_progress_deregister(ucc_context_t *ctx, ucc_status_t ucc_context_progress(ucc_context_h context) { + static int call_num = 0; ucc_status_t status; ucc_context_progress_entry_t *entry; - /* progress registered progress fns */ - ucc_list_for_each(entry, &context->progress_list, list_elem) { - entry->fn(entry->arg); + int is_empty; + + is_empty = ucc_progress_queue_is_empty(context->pq); + if (ucc_likely(is_empty)) { + call_num--; + if (ucc_likely(call_num >= 0)) { + return UCC_OK; + } + /* progress registered progress fns */ + ucc_list_for_each(entry, &context->progress_list, list_elem) { + entry->fn(entry->arg); + } + call_num = context->throttle_progress; + return UCC_OK; } + /* the fn below returns int - number of completed tasks. TODO : do we need to handle it ? Maybe return to user as int as well? */ diff --git a/src/core/ucc_context.h b/src/core/ucc_context.h index a95dd2b920..3944d5675b 100644 --- a/src/core/ucc_context.h +++ b/src/core/ucc_context.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -77,6 +77,7 @@ typedef struct ucc_context { ucc_context_topo_t *topo; uint64_t cl_flags; ucc_tl_team_t *service_team; + int32_t throttle_progress; } ucc_context_t; typedef struct ucc_context_config { @@ -90,6 +91,7 @@ typedef struct ucc_context_config { uint32_t estimated_num_ppn; uint32_t lock_free_progress_q; uint32_t internal_oob; + uint32_t throttle_progress; } ucc_context_config_t; /* Internal function for context creation that takes explicit diff --git a/src/core/ucc_progress_queue.h b/src/core/ucc_progress_queue.h index ba3d20b297..d4ede0c8c3 100644 --- a/src/core/ucc_progress_queue.h +++ b/src/core/ucc_progress_queue.h @@ -1,5 +1,6 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -14,6 +15,7 @@ struct ucc_progress_queue { void (*enqueue)(ucc_progress_queue_t *pq, ucc_coll_task_t *task); void (*dequeue)(ucc_progress_queue_t *pq, ucc_coll_task_t **task); int (*progress)(ucc_progress_queue_t *pq); + int (*is_empty)(ucc_progress_queue_t *pq); void (*finalize)(ucc_progress_queue_t *pq); }; @@ -46,6 +48,11 @@ static inline int ucc_progress_queue(ucc_progress_queue_t *pq) return pq->progress(pq); } +static inline int ucc_progress_queue_is_empty(ucc_progress_queue_t *pq) +{ + return pq->is_empty(pq); +} + void ucc_progress_queue_finalize(ucc_progress_queue_t *pq); #endif diff --git a/src/core/ucc_progress_queue_mt.c b/src/core/ucc_progress_queue_mt.c index 466628e27c..7da2171f03 100644 --- a/src/core/ucc_progress_queue_mt.c +++ b/src/core/ucc_progress_queue_mt.c @@ -1,5 +1,6 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -25,7 +26,7 @@ typedef struct ucc_pq_mt_locked { } ucc_pq_mt_locked_t; static void ucc_pq_locked_mt_enqueue(ucc_progress_queue_t *pq, - ucc_coll_task_t * task) + ucc_coll_task_t *task) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); @@ -42,7 +43,7 @@ static void ucc_pq_mt_enqueue(ucc_progress_queue_t *pq, ucc_coll_task_t *task) } static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq, - ucc_coll_task_t ** popped_task) + ucc_coll_task_t **popped_task) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); *popped_task = NULL; @@ -56,7 +57,7 @@ static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq, } static void ucc_pq_mt_dequeue(ucc_progress_queue_t *pq, - ucc_coll_task_t ** popped_task) + ucc_coll_task_t **popped_task) { ucc_pq_mt_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_t); ucc_lf_queue_elem_t *elem = ucc_lf_queue_dequeue(&pq_mt->lf_queue, 1); @@ -100,6 +101,20 @@ static int ucc_pq_mt_progress(ucc_progress_queue_t *pq) return n_progressed; } +static int ucc_pq_locked_mt_is_empty(ucc_progress_queue_t *pq) +{ + ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); + + /* this function should not be very accurate for the purpose of progress throttling */ + return ucc_list_is_empty(&pq_mt->queue); +} + +static int ucc_pq_mt_is_empty(ucc_progress_queue_t *pq) //NOLINT: pq is unused +{ + /* lock free progress queue never use throttling */ + return 0; +} + static void ucc_pq_locked_mt_finalize(ucc_progress_queue_t *pq) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); @@ -128,6 +143,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq, pq_mt->super.dequeue = ucc_pq_mt_dequeue; pq_mt->super.progress = ucc_pq_mt_progress; pq_mt->super.finalize = ucc_pq_mt_finalize; + pq_mt->super.is_empty = ucc_pq_mt_is_empty; *pq = &pq_mt->super; } else { ucc_pq_mt_locked_t *pq_mt = ucc_malloc(sizeof(*pq_mt), "pq_mt"); @@ -141,6 +157,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq, pq_mt->super.dequeue = ucc_pq_locked_mt_dequeue; pq_mt->super.progress = ucc_pq_mt_progress; pq_mt->super.finalize = ucc_pq_locked_mt_finalize; + pq_mt->super.is_empty = ucc_pq_locked_mt_is_empty; *pq = &pq_mt->super; } return UCC_OK; diff --git a/src/core/ucc_progress_queue_st.c b/src/core/ucc_progress_queue_st.c index 048d7313dd..e9842a70d4 100644 --- a/src/core/ucc_progress_queue_st.c +++ b/src/core/ucc_progress_queue_st.c @@ -1,5 +1,6 @@ /** - * Copyright (c) 2020, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -67,6 +68,13 @@ static void ucc_pq_st_finalize(ucc_progress_queue_t *pq) ucc_free(pq_st); } +static int ucc_pq_st_is_empty(ucc_progress_queue_t *pq) +{ + ucc_pq_st_t *pq_st = ucc_derived_of(pq, ucc_pq_st_t); + + return ucc_list_is_empty(&pq_st->list); +} + ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq) { ucc_pq_st_t *pq_st = ucc_malloc(sizeof(*pq_st), "pq_st"); @@ -79,6 +87,8 @@ ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq) pq_st->super.dequeue = NULL; pq_st->super.progress = ucc_pq_st_progress; pq_st->super.finalize = ucc_pq_st_finalize; + pq_st->super.is_empty = ucc_pq_st_is_empty; + *pq = &pq_st->super; return UCC_OK; }