diff --git a/config/m4/ucx.m4 b/config/m4/ucx.m4 index ba57dae303..35bb55f9bd 100644 --- a/config/m4/ucx.m4 +++ b/config/m4/ucx.m4 @@ -119,6 +119,7 @@ AS_IF([test "x$ucx_checked" != "xyes"],[ [], [#include ]) + AC_DEFINE([HAVE_UCX], 1, [Enable UCX support]) AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include int main(int argc, char** argv) { diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index 6074ed65c8..013653bfe6 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -37,10 +37,13 @@ alltoallv = \ alltoallv/alltoallv_onesided.c allreduce = \ - allreduce/allreduce.h \ - allreduce/allreduce.c \ - allreduce/allreduce_knomial.c \ - allreduce/allreduce_sra_knomial.c \ + allreduce/allreduce.h \ + allreduce/allreduce.c \ + allreduce/allreduce_knomial.c \ + allreduce/allreduce_sra_knomial.c \ + allreduce/allreduce_sliding_window.h \ + allreduce/allreduce_sliding_window.c \ + allreduce/allreduce_sliding_window_setup.c \ allreduce/allreduce_dbt.c barrier = \ diff --git a/src/components/tl/ucp/allreduce/allreduce.c b/src/components/tl/ucp/allreduce/allreduce.c index ef3f6f54ad..f9d41a1190 100644 --- a/src/components/tl/ucp/allreduce/allreduce.c +++ b/src/components/tl/ucp/allreduce/allreduce.c @@ -25,6 +25,10 @@ ucc_base_coll_alg_info_t .name = "dbt", .desc = "alreduce over double binary tree where a leaf in one tree " "will be intermediate in other (optimized for BW)"}, + [UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] = + {.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, + .name = "sliding_window", + .desc = "sliding window allreduce (optimized for running on DPU)"}, [UCC_TL_UCP_ALLREDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; @@ -51,3 +55,13 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, out: return status; } + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t __attribute__((unused)) *coll_args, //NOLINT + ucc_base_team_t __attribute__((unused)) *team, //NOLINT + ucc_coll_task_t __attribute__((unused)) **task_h) //NOLINT +{ + ucc_coll_task_t *coll_task = NULL; + ucc_tl_ucp_allreduce_sliding_window_progress(coll_task); + return UCC_OK; +} diff --git a/src/components/tl/ucp/allreduce/allreduce.h b/src/components/tl/ucp/allreduce/allreduce.h index 8eb75fb999..3f4c4e2834 100644 --- a/src/components/tl/ucp/allreduce/allreduce.h +++ b/src/components/tl/ucp/allreduce/allreduce.h @@ -11,6 +11,7 @@ enum { UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL, UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL, + UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW, UCC_TL_UCP_ALLREDUCE_ALG_DBT, UCC_TL_UCP_ALLREDUCE_ALG_LAST }; @@ -36,16 +37,41 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task); #define ALLREDUCE_TASK_CHECK(_args, _team) \ CHECK_SAME_MEMTYPE((_args), (_team)); + ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, ucc_coll_task_t **task_h); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_coll_task_t ** task_h); + ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t * team, + ucc_tl_ucp_task_t * task); + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize( + ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task); void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task); +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task); + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task); + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task); + ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args, diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c new file mode 100644 index 0000000000..59512bcd6d --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c @@ -0,0 +1,90 @@ +/** + * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "allreduce_sliding_window.h" +#include "../allgather/allgather.h" +#include "../barrier/barrier.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + + +static inline void //NOLINT +ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *buf) //NOLINT +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( //NOLINT + ucc_tl_ucp_allreduce_sw_pipeline_t __attribute__((unused)) *pipe, ucc_rank_t __attribute__((unused)) rank, //NOLINT + size_t __attribute__((unused)) put_window_size) //NOLINT +{ +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_reduction( + ucc_coll_task_t __attribute__((unused)) *coll_task, ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *accbuf,//NOLINT + ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *getbuf)//NOLINT +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT +{ +} + +static inline ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t __attribute__((unused)) request,//NOLINT + ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT +{ + return UCC_OK; +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(//NOLINT + ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT +{ +} + +static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(//NOLINT + ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT +{ +} + +static inline void +ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT +{ +} + +void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task)//NOLINT +{ + ucs_status_ptr_t request = 0; + ucc_tl_ucp_task_t *task = NULL; + ucc_tl_ucp_allreduce_sw_buf_t *accbuf = NULL; + ucc_tl_ucp_allreduce_sw_buf_t *getbuf = NULL; + ucc_tl_ucp_allreduce_sw_pipeline_t *pipe = NULL; + + // suppress "function unused" Werrors + ucc_tl_ucp_allreduce_sliding_window_barrier(coll_task); + ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(coll_task); + ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(coll_task); + ucc_tl_ucp_allreduce_sliding_window_req_test(request, task); + ucc_tl_ucp_allreduce_sliding_window_test_reduction(task); + ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, getbuf); + ucc_tl_ucp_allreduce_sliding_window_finalize(coll_task); + ucc_tl_ucp_allreduce_sliding_window_start(coll_task); + ucc_tl_ucp_allreduce_sliding_window_reset_pipeline(pipe, 0, 0); + ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf); +} diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.h b/src/components/tl/ucp/allreduce/allreduce_sliding_window.h new file mode 100644 index 0000000000..7dd1d07635 --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.h @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef ALLREDUCE_SW_H_ +#define ALLREDUCE_SW_H_ + +#include "tl_ucp_coll.h" + +#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024 + +typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} ucc_tl_ucp_allreduce_sw_global_work_buf_info_t; + +typedef enum ucc_tl_ucp_allreduce_sw_buf_state { + FREE, + RECVING, + REDUCING, + REDUCED, + SENDING, + IDLE, +} ucc_tl_ucp_allreduce_sw_buf_state_t; + +typedef struct ucc_tl_ucp_allreduce_sw_buf { + void *buf; + ucc_tl_ucp_allreduce_sw_buf_state_t state; + ucs_status_ptr_t ucp_req; + size_t count; + size_t bytes; +} ucc_tl_ucp_allreduce_sw_buf_t; + +typedef struct ucc_tl_ucp_allreduce_sw_pipeline { + ucc_tl_ucp_allreduce_sw_buf_t accbuf; + ucc_tl_ucp_allreduce_sw_buf_t *getbuf; + ucs_status_ptr_t *put_requests; + size_t buffer_size; + size_t num_buffers; + size_t avail_buffs; + size_t my_count; + size_t my_offset; + size_t count_issued; + size_t count_received; + size_t count_reduced; + size_t count_serviced; + size_t get_idx; + size_t red_idx; + ucc_rank_t src_rank; + ucc_rank_t dst_rank; + int done_get; + int done_red; + int done_put; + int posted_put; +} ucc_tl_ucp_allreduce_sw_pipeline_t; + +struct ucc_tl_ucp_allreduce_sw_export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void *packed_memh; + size_t packed_memh_len; + void *packed_key; + size_t packed_key_len; + uint64_t memh_id; +}; + +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather { + void *src_buf; + void *dst_buf; + char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; + char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN]; +} ucc_tl_ucp_allreduce_sw_host_allgather_t; + +#endif diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c new file mode 100644 index 0000000000..b90f6528ad --- /dev/null +++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c @@ -0,0 +1,43 @@ +/** + * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "allreduce.h" +#include "allreduce_sliding_window.h" +#include "../allgather/allgather.h" +#include "utils/ucc_dt_reduce.h" +#include "tl_ucp_ep.h" + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_alloc_pipe( + ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT + ucc_base_team_t __attribute__((unused)) *team,//NOLINT + ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_task_init( + ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT + ucc_base_team_t __attribute__((unused)) *team,//NOLINT + ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT +{ + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(//NOLINT + ucc_service_coll_req_t __attribute__((unused)) *scoll_req, //NOLINT + ucc_tl_ucp_task_t __attribute__((unused)) *sw_task)//NOLINT +{ + return UCC_OK; +} + +ucc_status_t +ucc_tl_ucp_allreduce_sliding_window_free_gwbi( + ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT +{ + return UCC_OK; +} diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 72586dc5de..2ce32ef72d 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix), UCC_CONFIG_TYPE_UINT_RANGED}, + {"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536", + "Buffer size of the sliding window allreduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size), + UCC_CONFIG_TYPE_MEMUNITS}, + + {"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0", + "Max concurrent puts in SW Allreduce. 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_put_window_size), + UCC_CONFIG_TYPE_UINT}, + + {"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0", + "Number of get buffers for sliding window AR. 0 means set to team size", + ucc_offsetof(ucc_tl_ucp_lib_config_t, + allreduce_sliding_window_num_get_bufs), + UCC_CONFIG_TYPE_UINT}, + {"ALLREDUCE_SRA_KN_RADIX", "auto", "Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix), diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index eac2303443..894a33bc6f 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix; + size_t allreduce_sliding_window_buf_size; + uint32_t allreduce_sliding_window_put_window_size; + uint32_t allreduce_sliding_window_num_get_bufs; ucc_mrange_uint_t allreduce_kn_radix; ucc_mrange_uint_t allreduce_sra_kn_radix; uint32_t reduce_scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 46bf6a2ed6..615798f1af 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -281,6 +281,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLREDUCE_ALG_DBT: *init = ucc_tl_ucp_allreduce_dbt_init; break; + case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW: + *init = ucc_tl_ucp_allreduce_sliding_window_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 0a8a340955..23ae1ab337 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -88,6 +88,11 @@ enum ucc_tl_ucp_task_flags { UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0), }; +typedef struct ucc_tl_ucp_allreduce_sw_pipeline + ucc_tl_ucp_allreduce_sw_pipeline; +typedef struct ucc_tl_ucp_allreduce_sw_host_allgather + ucc_tl_ucp_allreduce_sw_host_allgather; + typedef struct ucc_tl_ucp_task { ucc_coll_task_t super; uint32_t flags; @@ -121,6 +126,21 @@ typedef struct ucc_tl_ucp_task { ucc_ee_executor_task_t *etask; ucc_ee_executor_t *executor; } allreduce_kn; + struct { + int reduce_in_progress; + ucp_rkey_h *src_rkeys; //unpacked + ucp_rkey_h *dst_rkeys; //unpacked + void **sbufs; + void **rbufs; + ucc_tl_ucp_allreduce_sw_pipeline *pipe; + ucc_ee_executor_task_t *etask; + ucc_ee_executor_t *executor; + ucs_status_ptr_t *put_requests; + ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data; + ucc_schedule_t *sw_sched; + struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf; + struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf; + } allreduce_sliding_window; struct { int phase; ucc_knomial_pattern_t p; diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 591b2cf005..49c60fe16b 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -64,47 +64,48 @@ gtest_CXXFLAGS = -std=gnu++11 \ -DGTEST_UCM_HOOK_LIB_DIR="\"${abs_builddir}/ucm/test_dlopen/.libs\"" \ -DGTEST_UCC_TOP_SRCDIR="\"${UCC_TOP_SRCDIR}\"" -gtest_SOURCES = \ - common/gtest-all.cc \ - common/test_obj_size.cc \ - common/main.cc \ - common/test_ucc.cc \ - tl/tl_test.cc \ - core/test_lib_config.cc \ - core/test_lib.cc \ - core/test_context_config.cc \ - core/test_context.cc \ - core/test_mc.cc \ - core/test_mc_reduce.cc \ - core/test_team.cc \ - core/test_schedule.cc \ - core/test_topo.cc \ - core/test_service_coll.cc \ - core/test_timeout.cc \ - core/test_utils.cc \ - coll/test_barrier.cc \ - coll/test_alltoall.cc \ - coll/test_alltoallv.cc \ - coll/test_allgather.cc \ - coll/test_allgatherv.cc \ - coll/test_gather.cc \ - coll/test_gatherv.cc \ - coll/test_bcast.cc \ - coll/test_reduce.cc \ - coll/test_allreduce.cc \ - coll/test_reduce_scatter.cc \ - coll/test_reduce_scatterv.cc \ - coll/test_scatter.cc \ - coll/test_scatterv.cc \ - utils/test_string.cc \ - utils/test_ep_map.cc \ - utils/test_lock_free_queue.cc \ - utils/test_math.cc \ - utils/test_cfg_file.cc \ - utils/test_parser.cc \ - coll_score/test_score.cc \ - coll_score/test_score_str.cc \ - coll_score/test_score_update.cc \ +gtest_SOURCES = \ + common/gtest-all.cc \ + common/test_obj_size.cc \ + common/main.cc \ + common/test_ucc.cc \ + tl/tl_test.cc \ + core/test_lib_config.cc \ + core/test_lib.cc \ + core/test_context_config.cc \ + core/test_context.cc \ + core/test_mc.cc \ + core/test_mc_reduce.cc \ + core/test_team.cc \ + core/test_schedule.cc \ + core/test_topo.cc \ + core/test_service_coll.cc \ + core/test_timeout.cc \ + core/test_utils.cc \ + coll/test_barrier.cc \ + coll/test_alltoall.cc \ + coll/test_alltoallv.cc \ + coll/test_allgather.cc \ + coll/test_allgatherv.cc \ + coll/test_gather.cc \ + coll/test_gatherv.cc \ + coll/test_bcast.cc \ + coll/test_reduce.cc \ + coll/test_allreduce_sliding_window.cc \ + coll/test_allreduce.cc \ + coll/test_reduce_scatter.cc \ + coll/test_reduce_scatterv.cc \ + coll/test_scatter.cc \ + coll/test_scatterv.cc \ + utils/test_string.cc \ + utils/test_ep_map.cc \ + utils/test_lock_free_queue.cc \ + utils/test_math.cc \ + utils/test_cfg_file.cc \ + utils/test_parser.cc \ + coll_score/test_score.cc \ + coll_score/test_score_str.cc \ + coll_score/test_score_update.cc \ active_set/test_active_set.cc if TL_MLX5_ENABLED @@ -134,13 +135,20 @@ gtest_LDFLAGS += $(HIP_LDFLAGS) gtest_LDADD += $(HIP_LIBS) endif +if HAVE_UCX +gtest_CXXFLAGS += $(UCX_CXXFLAGS) +gtest_CPPFLAGS += $(UCX_CPPFLAGS) +gtest_LDFLAGS += $(UCX_LDFLAGS) +gtest_LDADD += $(UCX_LIBS) $(UCX_LIBADD) +endif -noinst_HEADERS = \ - common/gtest.h \ - common/test.h \ - common/test_ucc.h \ - core/test_context.h \ - core/test_mc_reduce.h \ +noinst_HEADERS = \ + common/gtest.h \ + common/test.h \ + common/test_ucc.h \ + core/test_context.h \ + core/test_mc_reduce.h \ + coll/test_allreduce_sliding_window.h \ coll_score/test_score.h .PHONY: test test gdb valgrind fix_rpath ucc diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc index 7e718cefaa..1dc5f0fcb2 100644 --- a/test/gtest/coll/test_allreduce.cc +++ b/test/gtest/coll/test_allreduce.cc @@ -7,6 +7,9 @@ #include "common/test_ucc.h" #include "utils/ucc_math.h" +// For sliding window allreduce +#include "test_allreduce_sliding_window.h" + #include template @@ -23,8 +26,9 @@ class test_allreduce : public UccCollArgs, public testing::Test { ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t)); ctxs[r]->args = coll; - coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; - coll->op = T::redop; + coll->coll_type = UCC_COLL_TYPE_ALLREDUCE; + coll->op = T::redop; + coll->global_work_buffer = NULL; ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf"); EXPECT_NE(ctxs[r]->init_buf, nullptr); @@ -433,6 +437,57 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) { } } +#ifdef HAVE_UCX +TYPED_TEST(test_allreduce_alg, sliding_window) +{ + int n_procs = 8; + ucc_job_env_t env = {{"UCC_TL_UCP_TUNE", "allreduce:@sliding_window"}, + {"UCC_CLS", "all"}}; + UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env); + UccTeam_h team = job.create_team(n_procs); + int repeat = 3; + test_ucp_info_t *ucp_infos = NULL; + std::vector mt = {UCC_MEMORY_TYPE_HOST}; + ucs_status_t ucs_status = UCS_OK; + UccCollCtxVec ctxs; + + if (UCC_OK == ucc_mc_available( + UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? + mt.push_back(UCC_MEMORY_TYPE_CUDA); + } + + for (auto count : {65536, 123567}) { + for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { + for (auto m : mt) { + SET_MEM_TYPE(m); + this->set_inplace(inplace); + this->data_init(n_procs, TypeParam::dt, count, ctxs, true); + + // set args->global_work_buffer on each ctx + ucs_status = setup_gwbi(n_procs, ctxs, &ucp_infos, inplace == TEST_INPLACE); + if (ucs_status != UCS_OK) { + free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE); + this->data_fini(ctxs); + if (ucs_status == UCS_ERR_UNSUPPORTED) { + GTEST_SKIP() << "Exported memory key not supported"; + } else { + GTEST_FAIL() << ucs_status_string(ucs_status); + } + } + + for (auto i = 0; i < repeat; i++) { + this->reset(ctxs); + } + + free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE); + ucp_infos = NULL; + this->data_fini(ctxs); + } + } + } +} +#endif + template class test_allreduce_avg_order : public test_allreduce { }; diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc new file mode 100644 index 0000000000..387c2aa070 --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.cc @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * See file LICENSE for terms. + */ + +/* + This file is for setting up the global work buffer for sliding window + allreduce. This entails allocating ucp workers, registering memory, + exchanging rkeys, and allocating the pipeline datastructure the + algorithm uses. +*/ + +#include "common/test_ucc.h" + +#ifdef HAVE_UCX + +#include "core/test_mc_reduce.h" +#include "utils/ucc_math.h" + +#include + +#include "test_allreduce_sliding_window.h" + +void test_init_ucp(ucp_context_h *ucp_ctx, ucp_config_t **ucp_config_p) +{ + ucs_status_t ucs_status; + ucp_config_t *ucp_config; + ucp_params_t ucp_params; + ucp_context_h ucp_context; + + ucs_status = ucp_config_read(NULL, NULL, &ucp_config); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_config_read() returned error: " + << ucs_status_string(ucs_status); + + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | + UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; + + ucs_status = ucp_init(&ucp_params, ucp_config, &ucp_context); + EXPECT_EQ(UCS_OK, ucs_status) << "ucp_init() returned error: " + << ucs_status_string(ucs_status); + + *ucp_ctx = ucp_context; + *ucp_config_p = ucp_config; +} + +ucs_status_t buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len, + struct export_buf *ebuf) +{ + ucs_status_t ucs_status = UCS_OK; + ucp_mem_map_params_t params; + ucp_memh_pack_params_t pack_params; + + ebuf->ucp_context = ucp_context; + + params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; + params.address = buf; + params.length = len; + + ucs_status = ucp_mem_map(ucp_context, ¶ms, &ebuf->memh); + if (ucs_status != UCS_OK) { + printf("ucp_mem_map() returned error: %s\n", ucs_status_string(ucs_status)); + return ucs_status; + } + + pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS; + pack_params.flags = UCP_MEMH_PACK_FLAG_EXPORT; + + ucs_status = ucp_memh_pack(ebuf->memh, &pack_params, &ebuf->packed_memh, + &ebuf->packed_memh_len); + if (ucs_status != UCS_OK) { + printf("ucp_memh_pack() returned error: %s\n", ucs_status_string(ucs_status)); + return ucs_status; + } + + return ucs_status; +} + +ucs_status_t setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + test_ucp_info_t **ucp_infos_p /* out */, bool inplace) +{ + int i; + ucs_status_t ucs_status = UCS_OK; + + test_ucp_info_t *ucp_infos = + (test_ucp_info_t *)ucc_malloc(sizeof(test_ucp_info_t) * n_procs); + EXPECT_NE(ucp_infos, nullptr); + *ucp_infos_p = ucp_infos; + + // allocate gwbi + for (auto ctx : ctxs) { + global_work_buf_info *gwbi = + (global_work_buf_info *)ucc_malloc( + sizeof(global_work_buf_info), + "global work buf info"); + + EXPECT_NE(gwbi, nullptr); + + ctx->args->global_work_buffer = gwbi; + } + + // setup ucp contexts and workers + for (i = 0; i < n_procs; i++) { + test_init_ucp(&ucp_infos[i].ucp_ctx, &ucp_infos[i].ucp_config); + ucp_infos[i].src_ebuf = {0}; + ucp_infos[i].dst_ebuf = {0}; + } + + // set up packed src/dst memh + for (i = 0; i < n_procs; i++) { + // my proc's gwbi + global_work_buf_info *gwbi = + (global_work_buf_info *)ctxs[i] + ->args->global_work_buffer; + // my proc's ucp_info + test_ucp_info_t * ucp_info = &ucp_infos[i]; + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + size_t dst_len = ctxs[i]->args->dst.info.count * + ucc_dt_size(ctxs[i]->args->dst.info.datatype); + + ucs_status = buffer_export_ucc( + ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer, + dst_len, dst_ebuf); + if (ucs_status != UCS_OK) return ucs_status; + + gwbi->packed_dst_memh = dst_ebuf->packed_memh; + + if (!inplace) { + size_t src_len = ctxs[i]->args->src.info.count * + ucc_dt_size(ctxs[i]->args->src.info.datatype); + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + ucs_status = buffer_export_ucc( + ucp_info->ucp_ctx, ctxs[i]->args->src.info.buffer, + src_len, src_ebuf); + if (ucs_status != UCS_OK) return ucs_status; + + gwbi->packed_src_memh = src_ebuf->packed_memh; + } + } + + // set the flag that indicates the global work buffer was passed + for (auto ctx : ctxs) { + ctx->args->mask |= + UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER; + ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS; + } + + return ucs_status; +} + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos, + bool inplace) +{ + int i, k; + ucs_status_t ucs_status; + + // free sbufs, rbufs, src_rkeys, and dst_rkeys + for (i = 0; i < n_procs; i++) { + // my proc's ucp_info + test_ucp_info_t *ucp_info = &ucp_infos[i]; + + if (!inplace) { + struct export_buf *src_ebuf = &ucp_info->src_ebuf; + if (src_ebuf->memh != 0) { + ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, src_ebuf->memh); + ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: " + << ucs_status_string(ucs_status); + } + } + + struct export_buf *dst_ebuf = &ucp_info->dst_ebuf; + if (dst_ebuf->memh != 0) { + ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, dst_ebuf->memh); + ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: " + << ucs_status_string(ucs_status); + } + } + + // free ucp contexts + for (i = 0; i < n_procs; i++) { + ucp_config_release(ucp_infos[i].ucp_config); + ucp_cleanup(ucp_infos[i].ucp_ctx); + } + + // free gwbi and each gwbi's set of pipes + for (k = 0; k < n_procs; k++) { + global_work_buf_info *gwbi = + (global_work_buf_info *) ctxs[k]->args->global_work_buffer; + + ucc_free(gwbi); + } + + ucc_free(ucp_infos); +} + +#endif diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h new file mode 100644 index 0000000000..0c14b7275a --- /dev/null +++ b/test/gtest/coll/test_allreduce_sliding_window.h @@ -0,0 +1,36 @@ +#ifndef TEST_ALLREDUCE_SW_H +#define TEST_ALLREDUCE_SW_H + +#include "common/test_ucc.h" + +#ifdef HAVE_UCX + +#include + +typedef struct global_work_buf_info { + void *packed_src_memh; + void *packed_dst_memh; +} global_work_buf_info; + +struct export_buf { + ucp_context_h ucp_context; + ucp_mem_h memh; + void * packed_memh; + size_t packed_memh_len; + uint64_t memh_id; +}; + +typedef struct test_ucp_info_t { + ucp_context_h ucp_ctx; + ucp_config_t *ucp_config; + struct export_buf src_ebuf; + struct export_buf dst_ebuf; +} test_ucp_info_t; + +void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos, + bool inplace); +ucs_status_t setup_gwbi(int n_procs, UccCollCtxVec &ctxs, + test_ucp_info_t **ucp_infos_p /* out */, bool inplace); + +#endif +#endif