diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index bf8e40aa6c..721a2ea156 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -36,10 +36,12 @@ alltoallv = \ alltoallv/alltoallv_onesided.c allreduce = \ - allreduce/allreduce.h \ - allreduce/allreduce.c \ - allreduce/allreduce_knomial.c \ - allreduce/allreduce_sra_knomial.c \ + allreduce/allreduce.h \ + allreduce/allreduce.c \ + allreduce/allreduce_knomial.c \ + allreduce/allreduce_sra_knomial.c \ + allreduce/allreduce_sliding_window.c \ + allreduce/allreduce_sliding_window_setup.c \ allreduce/allreduce_dbt.c barrier = \ diff --git a/src/components/tl/ucp/allreduce/allreduce.c b/src/components/tl/ucp/allreduce/allreduce.c index ef3f6f54ad..ed09399638 100644 --- a/src/components/tl/ucp/allreduce/allreduce.c +++ b/src/components/tl/ucp/allreduce/allreduce.c @@ -25,6 +25,10 @@ ucc_base_coll_alg_info_t .name = "dbt", .desc = "alreduce over double binary tree where a leaf in one tree " "will be intermediate in other (optimized for BW)"}, + [UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] = + {.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, + .name = "sliding_window", + .desc = "sliding window allreduce (optimized for running on DPU)"}, [UCC_TL_UCP_ALLREDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; @@ -51,3 +55,11 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, out: return status; } + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h) +{ + return UCC_OK; +} diff --git a/src/components/tl/ucp/allreduce/allreduce.h b/src/components/tl/ucp/allreduce/allreduce.h index 8eb75fb999..bd33bd2f30 100644 --- a/src/components/tl/ucp/allreduce/allreduce.h +++ b/src/components/tl/ucp/allreduce/allreduce.h @@ -11,6 +11,7 @@ enum { UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, + UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, UCC_TL_UCP_ALLREDUCE_ALG_DBT, UCC_TL_UCP_ALLREDUCE_ALG_LAST }; @@ -36,16 +37,105 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task); #define ALLREDUCE_TASK_CHECK(_args, _team) \ CHECK_SAME_MEMTYPE((_args), (_team)); +#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024 + +typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} ucc_tl_ucp_allreduce_sw_global_work_buf_info_t; + +typedef enum ucc_tl_ucp_allreduce_sw_buf_state +{ + FREE, + RECVING, + REDUCING, + REDUCED, + SENDING, + IDLE, +} ucc_tl_ucp_allreduce_sw_buf_state_t; + +typedef struct ucc_tl_ucp_allreduce_sw_buf { + void *buf; + ucc_tl_ucp_allreduce_sw_buf_state_t state; + ucs_status_ptr_t ucp_req; + size_t count; + size_t bytes; +} ucc_tl_ucp_allreduce_sw_buf_t; + +typedef struct ucc_tl_ucp_allreduce_sw_pipeline { + ucc_tl_ucp_allreduce_sw_buf_t accbuf; + ucc_tl_ucp_allreduce_sw_buf_t *getbuf; + ucs_status_ptr_t *put_requests; + size_t buffer_size; + size_t num_buffers; + size_t avail_buffs; + size_t my_count; + size_t my_offset; + size_t count_issued; + size_t count_received; + size_t count_reduced; + size_t count_serviced; + size_t get_idx; + size_t red_idx; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + int done_get; + int done_red; + int done_put; + int posted_put; +} ucc_tl_ucp_allreduce_sw_pipeline_t; + +struct ucc_tl_ucp_allreduce_sw_export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void *packed_memh; + size_t packed_memh_len; + void *packed_key; + size_t packed_key_len; + uint64_t memh_id; +}; + +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather { + void *src_buf; + void *dst_buf; + char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; + char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; +} ucc_tl_ucp_allreduce_sw_host_allgather_t; + ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, ucc_coll_task_t **task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); + ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task); void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task); + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c new file mode 100644 index 0000000000..b994f818fb --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -0,0 +1,73 @@ +/** + * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "../barrier/barrier.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +// NOLINTBEGIN +static inline void +ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf_t *buf) +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( + ucc_tl_ucp_allreduce_sw_pipeline_t *pipe, ucc_rank_t rank, + size_t put_window_size) +{ +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reduction( + ucc_coll_task_t *coll_task, ucc_tl_ucp_allreduce_sw_buf_t *accbuf, + ucc_tl_ucp_allreduce_sw_buf_t *getbuf) +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t *task) +{ +} + +static inline ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t request, + ucc_tl_ucp_task_t *task) +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test( + ucc_coll_task_t *coll_task) +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys( + ucc_coll_task_t *coll_task) +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t *coll_task) +{ +} + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task) +{ +} +// NOLINTEND diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c new file mode 100644 index 0000000000..64362cf940 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c @@ -0,0 +1,38 @@ +/** + * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "../allgather/allgather.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task) +{ + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task) +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task) +{ + return UCC_OK; +} diff --git a/src/components/tl/ucp/barrier/barrier.c b/src/components/tl/ucp/barrier/barrier.c index 84170dce7b..19f82d6021 100644 --- a/src/components/tl/ucp/barrier/barrier.c +++ b/src/components/tl/ucp/barrier/barrier.c @@ -7,9 +7,6 @@ #include "tl_ucp.h" #include "barrier.h" -ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); -void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); - ucc_base_coll_alg_info_t ucc_tl_ucp_barrier_algs[UCC_TL_UCP_BARRIER_ALG_LAST + 1] = { [UCC_TL_UCP_BARRIER_ALG_KNOMIAL] = diff --git a/src/components/tl/ucp/barrier/barrier.h b/src/components/tl/ucp/barrier/barrier.h index de16490621..a6c7c65336 100644 --- a/src/components/tl/ucp/barrier/barrier.h +++ b/src/components/tl/ucp/barrier/barrier.h @@ -18,4 +18,7 @@ extern ucc_base_coll_alg_info_t ucc_status_t ucc_tl_ucp_barrier_init(ucc_tl_ucp_task_t *task); +ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task); +void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task); + #endif diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 11a8952173..b5de40ec21 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix), UCC_CONFIG_TYPE_UINT_RANGED}, + {"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536", + "Buffer size of the sliding window allreduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size), + UCC_CONFIG_TYPE_MEMUNITS}, + + {"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0", + "Max concurrent puts in SW Allreduce. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_put_window_size), + UCC_CONFIG_TYPE_UINT}, + + {"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0", + "Number of get buffers for sliding window AR. <= 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_num_get_bufs), + UCC_CONFIG_TYPE_UINT}, + {"ALLREDUCE_SRA_KN_RADIX", "auto", "Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix), diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index eac2303443..894a33bc6f 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix; + size_t allreduce_sliding_window_buf_size; + uint32_t allreduce_sliding_window_put_window_size; + uint32_t allreduce_sliding_window_num_get_bufs; ucc_mrange_uint_t allreduce_kn_radix; ucc_mrange_uint_t allreduce_sra_kn_radix; uint32_t reduce_scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 23c254b00e..f7d9202db7 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -278,6 +278,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLREDUCE_ALG_DBT: *init = ucc_tl_ucp_allreduce_dbt_init; break; + case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW: + *init = ucc_tl_ucp_allreduce_sliding_window_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 6ab2c661dd..31e7394db8 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -88,6 +88,11 @@ enum ucc_tl_ucp_task_flags { UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0), }; +typedef struct ucc_tl_ucp_allreduce_sw_pipeline + ucc_tl_ucp_allreduce_sw_pipeline; +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather + ucc_tl_ucp_allreduce_sw_host_allgather; + typedef struct ucc_tl_ucp_task { ucc_coll_task_t super; uint32_t flags; @@ -121,6 +126,27 @@ typedef struct ucc_tl_ucp_task { ucc_ee_executor_task_t *etask; ucc_ee_executor_t *executor; } allreduce_kn; + struct { + int reduce_in_progress; + ucp_rkey_h * src_rkeys; //unpacked + ucp_rkey_h * dst_rkeys; //unpacked + ucp_ep_h * eps; + void ** sbufs; + void ** rbufs; + ucc_coll_task_t * allreduce_task_h; + ucc_tl_ucp_allreduce_sw_pipeline * pipe; + ucc_ee_executor_task_t * etask; + ucc_ee_executor_t * executor; + int put_window_size; + int num_get_bufs; + ucs_status_ptr_t * put_requests; + ucc_service_coll_req_t * allgather_scoll_req; + ucc_tl_ucp_allreduce_sw_host_allgather * allgather_data; + ucc_coll_task_t * barrier_task; + struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf; + struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf; + int inplace; + } allreduce_sliding_window; struct { int phase; ucc_knomial_pattern_t p; diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 591b2cf005..9f3f4a9ebe 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -64,47 +64,48 @@ gtest_CXXFLAGS = -std=gnu++11 \ -DGTEST_UCM_HOOK_LIB_DIR="\"${abs_builddir}/ucm/test_dlopen/.libs\"" \ -DGTEST_UCC_TOP_SRCDIR="\"${UCC_TOP_SRCDIR}\"" -gtest_SOURCES = \ - common/gtest-all.cc \ - common/test_obj_size.cc \ - common/main.cc \ - common/test_ucc.cc \ - tl/tl_test.cc \ - core/test_lib_config.cc \ - core/test_lib.cc \ - core/test_context_config.cc \ - core/test_context.cc \ - core/test_mc.cc \ - core/test_mc_reduce.cc \ - core/test_team.cc \ - core/test_schedule.cc \ - core/test_topo.cc \ - core/test_service_coll.cc \ - core/test_timeout.cc \ - core/test_utils.cc \ - coll/test_barrier.cc \ - coll/test_alltoall.cc \ - coll/test_alltoallv.cc \ - coll/test_allgather.cc \ - coll/test_allgatherv.cc \ - coll/test_gather.cc \ - coll/test_gatherv.cc \ - coll/test_bcast.cc \ - coll/test_reduce.cc \ - coll/test_allreduce.cc \ - coll/test_reduce_scatter.cc \ - coll/test_reduce_scatterv.cc \ - coll/test_scatter.cc \ - coll/test_scatterv.cc \ - utils/test_string.cc \ - utils/test_ep_map.cc \ - utils/test_lock_free_queue.cc \ - utils/test_math.cc \ - utils/test_cfg_file.cc \ - utils/test_parser.cc \ - coll_score/test_score.cc \ - coll_score/test_score_str.cc \ - coll_score/test_score_update.cc \ +gtest_SOURCES = \ + common/gtest-all.cc \ + common/test_obj_size.cc \ + common/main.cc \ + common/test_ucc.cc \ + tl/tl_test.cc \ + core/test_lib_config.cc \ + core/test_lib.cc \ + core/test_context_config.cc \ + core/test_context.cc \ + core/test_mc.cc \ + core/test_mc_reduce.cc \ + core/test_team.cc \ + core/test_schedule.cc \ + core/test_topo.cc \ + core/test_service_coll.cc \ + core/test_timeout.cc \ + core/test_utils.cc \ + coll/test_barrier.cc \ + coll/test_alltoall.cc \ + coll/test_alltoallv.cc \ + coll/test_allgather.cc \ + coll/test_allgatherv.cc \ + coll/test_gather.cc \ + coll/test_gatherv.cc \ + coll/test_bcast.cc \ + coll/test_reduce.cc \ + coll/test_allreduce_sliding_window.cc \ + coll/test_allreduce.cc \ + coll/test_reduce_scatter.cc \ + coll/test_reduce_scatterv.cc \ + coll/test_scatter.cc \ + coll/test_scatterv.cc \ + utils/test_string.cc \ + utils/test_ep_map.cc \ + utils/test_lock_free_queue.cc \ + utils/test_math.cc \ + utils/test_cfg_file.cc \ + utils/test_parser.cc \ + coll_score/test_score.cc \ + coll_score/test_score_str.cc \ + coll_score/test_score_update.cc \ active_set/test_active_set.cc if TL_MLX5_ENABLED @@ -134,13 +135,18 @@ gtest_LDFLAGS += $(HIP_LDFLAGS) gtest_LDADD += $(HIP_LIBS) endif - -noinst_HEADERS = \ - common/gtest.h \ - common/test.h \ - common/test_ucc.h \ - core/test_context.h \ - core/test_mc_reduce.h \ +gtest_CXXFLAGS += $(UCX_CXXFLAGS) +gtest_CPPFLAGS += $(UCX_CPPFLAGS) +gtest_LDFLAGS += $(UCX_LDFLAGS) +gtest_LDADD += $(UCX_LIBS) $(UCX_LIBADD) + +noinst_HEADERS = \ + common/gtest.h \ + common/test.h \ + common/test_ucc.h \ + core/test_context.h \ + core/test_mc_reduce.h \ + coll/test_allreduce_sliding_window.h \ coll_score/test_score.h .PHONY: test test gdb valgrind fix_rpath ucc diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc index 7e718cefaa..71b7df4f38 100644 --- a/test/gtest/coll/test_allreduce.cc +++ b/test/gtest/coll/test_allreduce.cc @@ -7,6 +7,9 @@ #include "common/test_ucc.h" #include "utils/ucc_math.h" +// For sliding window allreduce +#include "test_allreduce_sliding_window.h" + #include template @@ -23,8 +26,9 @@ class test_allreduce : public UccCollArgs, public testing::Test { ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ctxs[r]->args = coll; - coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; - coll->op = T::redop; + coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; + coll->op = T::redop; + coll->global_work_buffer = NULL; ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf"); EXPECT_NE(ctxs[r]->init_buf, nullptr); @@ -433,6 +437,45 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) { } } +TYPED_TEST(test_allreduce_alg, sliding_window) +{ + int n_procs = 8; + ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "allreduce:@2"}, + {"UCC_CLS", "all"}}; + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); + UccTeam_h team = job.create_team(n_procs); + int repeat = 3; + test_ucp_info_t *ucp_infos = NULL; + UccCollCtxVec ctxs; + std::vector mt = {UCC_MEMORY_TYPE_HOST}; + + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + mt.push_back(UCC_MEMORY_TYPE_CUDA); + } + + for (auto count : {65536, 123567}) { + for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { + for (auto m : mt) { + SET_MEM_TYPE(m); + this->set_inplace(inplace); + this->data_init(n_procs, TypeParam::dt, count, ctxs, true); + + // set args->global_work_buffer on each ctx + setup_gwbi(n_procs, ctxs, &ucp_infos, inplace == TEST_INPLACE); + + for (auto i = 0; i < repeat; i++) { + this->reset(ctxs); + } + + free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE); + ucp_infos = NULL; + this->data_fini(ctxs); + } + } + } +} + template class test_allreduce_avg_order : public test_allreduce { }; diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc new file mode 100644 index 0000000000..79c8d9e105 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.cc @@ -0,0 +1,193 @@ +/** + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * See file LICENSE for terms. + */ + +/* + This file is for setting up the global work buffer for sliding window + allreduce. This entails allocating ucp workers, registering memory, + exchanging rkeys, and allocating the pipeline datastructure the + algorithm uses. +*/ + +#include "core/test_mc_reduce.h" +#include "common/test_ucc.h" +#include "utils/ucc_math.h" + +#include + +#include "test_allreduce_sliding_window.h" + +int test_init_ucp(ucp_context_h *ucp_ctx) +{ + ucs_status_t ucs_status; + ucp_config_t *ucp_config; + ucp_params_t ucp_params; + ucp_context_h ucp_context; + + ucs_status = ucp_config_read(NULL, NULL, &ucp_config); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_config_read() returned error: " + << ucs_status_string(ucs_status); + + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | + UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; + + ucs_status = ucp_init(&ucp_params, ucp_config, &ucp_context); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_init() returned error: " + << ucs_status_string(ucs_status); + + *ucp_ctx = ucp_context; + + return 0; +} + +void ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t ucs_status) +{ + ADD_FAILURE() << "Endpoint error detected, status: " + << ucs_status_string(ucs_status); +} + +ucs_status_t buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf) +{ + ucs_status_t ucs_status = UCS_OK; + ucp_mem_map_params_t params; + ucp_memh_pack_params_t pack_params; + + ebuf->ucp_context = ucp_context; + + params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; + params.address = buf; + params.length = len; + + ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_mem_map() returned error: " + << ucs_status_string(ucs_status); + + pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS; + pack_params.flags = UCP_MEMH_PACK_FLAG_EXPORT; + + ucs_status = ucp_memh_pack(ebuf->memh, &pack_params, &ebuf->packed_memh, + &ebuf->packed_memh_len); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_memh_pack() returned error: " + << ucs_status_string(ucs_status); + + return ucs_status; +} + +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + test_ucp_info_t **ucp_infos_p /* out */, bool inplace) +{ + int i; + ucs_status_t ucs_status = UCS_OK; + + test_ucp_info_t *ucp_infos = + (test_ucp_info_t *)ucc_malloc(sizeof(test_ucp_info_t) * n_procs); + EXPECT_NE(ucp_infos, nullptr); + *ucp_infos_p = ucp_infos; + + // allocate gwbi + for (auto ctx : ctxs) { + global_work_buf_info *gwbi = + (global_work_buf_info *)ucc_malloc( + sizeof(global_work_buf_info), + "global work buf info"); + + EXPECT_NE(gwbi, nullptr); + + ctx->args->global_work_buffer = gwbi; + } + + // setup ucp contexts and workers + for (i = 0; i < n_procs; i++) { + test_ucp_info_t ucp_info; + if (test_init_ucp(&ucp_info.ucp_ctx) != 0) { + ADD_FAILURE() << "test_init_ucp failed"; + } + memcpy(&ucp_infos[i], &ucp_info, sizeof(test_ucp_info_t)); + } + + // set up packed src/dst memh + for (i = 0; i < n_procs; i++) { + // my proc's gwbi + global_work_buf_info *gwbi = + (global_work_buf_info *)ctxs[i] + ->args->global_work_buffer; + // my proc's ucp_info + test_ucp_info_t * ucp_info = &ucp_infos[i]; + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + size_t dst_len = ctxs[i]->args->dst.info.count * + ucc_dt_size(ctxs[i]->args->dst.info.datatype); + + ucs_status = buffer_export_ucc( + ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer, + dst_len, dst_ebuf); + ASSERT_EQ(UCS_OK, ucs_status) << "buffer_export_ucc() returned error: " + << ucs_status_string(ucs_status); + + gwbi->packed_dst_memh = dst_ebuf->packed_memh; + + if (!inplace) { + size_t src_len = ctxs[i]->args->src.info.count * + ucc_dt_size(ctxs[i]->args->src.info.datatype); + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + ucs_status = buffer_export_ucc( + ucp_info->ucp_ctx, ctxs[i]->args->src.info.buffer, + src_len, src_ebuf); + ASSERT_EQ(UCS_OK, ucs_status) + << "buffer_export_ucc() returned error: " + << ucs_status_string(ucs_status); + + gwbi->packed_src_memh = src_ebuf->packed_memh; + } + } + + // set the flag that indicates the global work buffer was passed + for (auto ctx : ctxs) { + ctx->args->mask |= + UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; + } +} + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos, + bool inplace) +{ + int i, k; + ucs_status_t ucs_status; + + // free sbufs, rbufs, src_rkeys, and dst_rkeys + for (i = 0; i < n_procs; i++) { + // my proc's ucp_info + test_ucp_info_t *ucp_info = &ucp_infos[i]; + + if (!inplace) { + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, src_ebuf->memh); + ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: " + << ucs_status_string(ucs_status); + } + + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, dst_ebuf->memh); + ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: " + << ucs_status_string(ucs_status); + } + + // free ucp contexts + for (i = 0; i < n_procs; i++) { + ucp_cleanup(ucp_infos[i].ucp_ctx); + } + + // free gwbi and each gwbi's set of pipes + for (k = 0; k < n_procs; k++) { + global_work_buf_info *gwbi = + (global_work_buf_info *) ctxs[k]->args->global_work_buffer; + + ucc_free(gwbi); + } + + ucc_free(ucp_infos); +} diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h new file mode 100644 index 0000000000..b9b1339ad2 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.h @@ -0,0 +1,30 @@ +#ifndef TEST_ALLREDUCE_SW_H +#define TEST_ALLREDUCE_SW_H + +#include + +typedef struct global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} global_work_buf_info; + +struct export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + uint64_t memh_id; +}; + +typedef struct test_ucp_info_t { + ucp_context_h ucp_ctx; + struct export_buf src_ebuf; + struct export_buf dst_ebuf; +} test_ucp_info_t; + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos, + bool inplace); +void setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + test_ucp_info_t **ucp_infos_p /* out */, bool inplace); + +#endif