Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Window Allreduce Stubs #902

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/m4/ucx.m4
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ AS_IF([test "x$ucx_checked" != "xyes"],[
[],
[#include <ucs/memory/rcache.h>])

AC_DEFINE([HAVE_UCX], 1, [Enable UCX support])

AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include <ucs/config/parser.h>
int main(int argc, char** argv) {
Expand Down
11 changes: 7 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ alltoallv = \
alltoallv/alltoallv_onesided.c

allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sra_knomial.c \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sra_knomial.c \
allreduce/allreduce_sliding_window.h \
allreduce/allreduce_sliding_window.c \
allreduce/allreduce_sliding_window_setup.c \
allreduce/allreduce_dbt.c

barrier = \
Expand Down
14 changes: 14 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ ucc_base_coll_alg_info_t
.name = "dbt",
.desc = "alreduce over double binary tree where a leaf in one tree "
"will be intermediate in other (optimized for BW)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
.name = "sliding_window",
.desc = "sliding window allreduce (optimized for running on DPU)"},
[UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand All @@ -51,3 +55,13 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
out:
return status;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t __attribute__((unused)) *coll_args, //NOLINT
ucc_base_team_t __attribute__((unused)) *team, //NOLINT
ucc_coll_task_t __attribute__((unused)) **task_h) //NOLINT
{
ucc_coll_task_t *coll_task = NULL;
ucc_tl_ucp_allreduce_sliding_window_progress(coll_task);
return UCC_OK;
}
26 changes: 26 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
enum {
UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
UCC_TL_UCP_ALLREDUCE_ALG_DBT,
UCC_TL_UCP_ALLREDUCE_ALG_LAST
};
Expand All @@ -36,16 +37,41 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
#define ALLREDUCE_TASK_CHECK(_args, _team) \
CHECK_SAME_MEMTYPE((_args), (_team));


ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(
ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task);
nsarka marked this conversation as resolved.
Show resolved Hide resolved

ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);

void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
Expand Down
90 changes: 90 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce_sliding_window.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "allreduce.h"
#include "allreduce_sliding_window.h"
#include "../allgather/allgather.h"
#include "../barrier/barrier.h"
#include "utils/ucc_dt_reduce.h"
#include "tl_ucp_ep.h"


static inline void //NOLINT
ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *buf) //NOLINT
{
}

static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( //NOLINT
ucc_tl_ucp_allreduce_sw_pipeline_t __attribute__((unused)) *pipe, ucc_rank_t __attribute__((unused)) rank, //NOLINT
size_t __attribute__((unused)) put_window_size) //NOLINT
{
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
{
return UCC_OK;
}

static inline void ucc_tl_ucp_allreduce_sliding_window_reduction(
ucc_coll_task_t __attribute__((unused)) *coll_task, ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *accbuf,//NOLINT
ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *getbuf)//NOLINT
{
}

static inline void
ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
}

static inline ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t __attribute__((unused)) request,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(//NOLINT
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(//NOLINT
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

static inline void
ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task)//NOLINT
{
ucs_status_ptr_t request = 0;
ucc_tl_ucp_task_t *task = NULL;
ucc_tl_ucp_allreduce_sw_buf_t *accbuf = NULL;
ucc_tl_ucp_allreduce_sw_buf_t *getbuf = NULL;
ucc_tl_ucp_allreduce_sw_pipeline_t *pipe = NULL;

// suppress "function unused" Werrors
ucc_tl_ucp_allreduce_sliding_window_barrier(coll_task);
ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(coll_task);
ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(coll_task);
ucc_tl_ucp_allreduce_sliding_window_req_test(request, task);
ucc_tl_ucp_allreduce_sliding_window_test_reduction(task);
ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, getbuf);
ucc_tl_ucp_allreduce_sliding_window_finalize(coll_task);
ucc_tl_ucp_allreduce_sliding_window_start(coll_task);
ucc_tl_ucp_allreduce_sliding_window_reset_pipeline(pipe, 0, 0);
ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf);
}
76 changes: 76 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce_sliding_window.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#ifndef ALLREDUCE_SW_H_
#define ALLREDUCE_SW_H_

#include "tl_ucp_coll.h"

#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024

typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
void *packed_src_memh;
void *packed_dst_memh;
} ucc_tl_ucp_allreduce_sw_global_work_buf_info_t;

typedef enum ucc_tl_ucp_allreduce_sw_buf_state {
FREE,
RECVING,
REDUCING,
REDUCED,
SENDING,
IDLE,
} ucc_tl_ucp_allreduce_sw_buf_state_t;

typedef struct ucc_tl_ucp_allreduce_sw_buf {
void *buf;
ucc_tl_ucp_allreduce_sw_buf_state_t state;
ucs_status_ptr_t ucp_req;
size_t count;
size_t bytes;
} ucc_tl_ucp_allreduce_sw_buf_t;

typedef struct ucc_tl_ucp_allreduce_sw_pipeline {
ucc_tl_ucp_allreduce_sw_buf_t accbuf;
ucc_tl_ucp_allreduce_sw_buf_t *getbuf;
ucs_status_ptr_t *put_requests;
size_t buffer_size;
size_t num_buffers;
size_t avail_buffs;
size_t my_count;
size_t my_offset;
size_t count_issued;
size_t count_received;
size_t count_reduced;
size_t count_serviced;
size_t get_idx;
size_t red_idx;
ucc_rank_t src_rank;
ucc_rank_t dst_rank;
int done_get;
int done_red;
int done_put;
int posted_put;
} ucc_tl_ucp_allreduce_sw_pipeline_t;

struct ucc_tl_ucp_allreduce_sw_export_buf {
ucp_context_h ucp_context;
ucp_mem_h memh;
void *packed_memh;
size_t packed_memh_len;
void *packed_key;
size_t packed_key_len;
uint64_t memh_id;
};

typedef struct ucc_tl_ucp_allreduce_sw_host_allgather {
void *src_buf;
void *dst_buf;
char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
} ucc_tl_ucp_allreduce_sw_host_allgather_t;

#endif
43 changes: 43 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "allreduce.h"
#include "allreduce_sliding_window.h"
#include "../allgather/allgather.h"
#include "utils/ucc_dt_reduce.h"
#include "tl_ucp_ep.h"

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(
ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
ucc_base_team_t __attribute__((unused)) *team,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(
ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
ucc_base_team_t __attribute__((unused)) *team,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(//NOLINT
ucc_service_coll_req_t __attribute__((unused)) *scoll_req, //NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *sw_task)//NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
return UCC_OK;
}
17 changes: 17 additions & 0 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix),
UCC_CONFIG_TYPE_UINT_RANGED},

{"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536",
"Buffer size of the sliding window allreduce algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size),
UCC_CONFIG_TYPE_MEMUNITS},

{"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0",
"Max concurrent puts in SW Allreduce. 0 means set to team size",
ucc_offsetof(ucc_tl_ucp_lib_config_t,
allreduce_sliding_window_put_window_size),
UCC_CONFIG_TYPE_UINT},

{"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0",
"Number of get buffers for sliding window AR. 0 means set to team size",
ucc_offsetof(ucc_tl_ucp_lib_config_t,
allreduce_sliding_window_num_get_bufs),
UCC_CONFIG_TYPE_UINT},

{"ALLREDUCE_SRA_KN_RADIX", "auto",
"Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix),
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config {
uint32_t fanin_kn_radix;
uint32_t fanout_kn_radix;
uint32_t barrier_kn_radix;
size_t allreduce_sliding_window_buf_size;
uint32_t allreduce_sliding_window_put_window_size;
uint32_t allreduce_sliding_window_num_get_bufs;
ucc_mrange_uint_t allreduce_kn_radix;
ucc_mrange_uint_t allreduce_sra_kn_radix;
uint32_t reduce_scatter_kn_radix;
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
case UCC_TL_UCP_ALLREDUCE_ALG_DBT:
*init = ucc_tl_ucp_allreduce_dbt_init;
break;
case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW:
*init = ucc_tl_ucp_allreduce_sliding_window_init;
break;
default:
status = UCC_ERR_INVALID_PARAM;
break;
Expand Down
20 changes: 20 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ enum ucc_tl_ucp_task_flags {
UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0),
};

typedef struct ucc_tl_ucp_allreduce_sw_pipeline
ucc_tl_ucp_allreduce_sw_pipeline;
typedef struct ucc_tl_ucp_allreduce_sw_host_allgather
ucc_tl_ucp_allreduce_sw_host_allgather;

typedef struct ucc_tl_ucp_task {
ucc_coll_task_t super;
uint32_t flags;
Expand Down Expand Up @@ -121,6 +126,21 @@ typedef struct ucc_tl_ucp_task {
ucc_ee_executor_task_t *etask;
ucc_ee_executor_t *executor;
} allreduce_kn;
struct {
int reduce_in_progress;
ucp_rkey_h *src_rkeys; //unpacked
ucp_rkey_h *dst_rkeys; //unpacked
void **sbufs;
void **rbufs;
ucc_tl_ucp_allreduce_sw_pipeline *pipe;
ucc_ee_executor_task_t *etask;
ucc_ee_executor_t *executor;
ucs_status_ptr_t *put_requests;
ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data;
ucc_schedule_t *sw_sched;
struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf;
struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf;
} allreduce_sliding_window;
struct {
int phase;
ucc_knomial_pattern_t p;
Expand Down
Loading
Loading