From b8280d113bdb2db7f643b67340f774fdabecd9f1 Mon Sep 17 00:00:00 2001 From: Mamzi Bayatpour Date: Thu, 6 Jun 2024 12:36:33 -0700 Subject: [PATCH] TL/MLX5: address Sam's comments from June 6 --- src/components/tl/mlx5/mcast/tl_mlx5_mcast.h | 4 - .../tl/mlx5/mcast/tl_mlx5_mcast_helper.c | 23 +--- .../tl_mlx5_mcast_one_sided_reliability.c | 103 ++++++++++-------- 3 files changed, 61 insertions(+), 69 deletions(-) diff --git a/src/components/tl/mlx5/mcast/tl_mlx5_mcast.h b/src/components/tl/mlx5/mcast/tl_mlx5_mcast.h index e220c08075..80b5962f50 100644 --- a/src/components/tl/mlx5/mcast/tl_mlx5_mcast.h +++ b/src/components/tl/mlx5/mcast/tl_mlx5_mcast.h @@ -99,9 +99,6 @@ typedef struct mcast_coll_comm_init_spec { int wsize; int max_eager; void *oob; - int one_sided_reliability_enabled; - int one_sided_reliability_scheme_msg_threshold; - int enable_truly_zero_copy_pipelined_allgather; } ucc_tl_mlx5_mcast_coll_comm_init_spec_t; typedef struct ucc_tl_mlx5_mcast_context_config { @@ -225,7 +222,6 @@ typedef struct ucc_tl_mlx5_mcast_one_sided_reliability_comm { * if remote temp slot is ready for RDMA READ in async design */ uint32_t *remote_slot_info; struct ibv_mr *remote_slot_info_mr; - int reliability_enabled; int reliability_scheme_msg_threshold; /* mem address and mem keys of the temp slots in async design */ char *slots_buffer; diff --git a/src/components/tl/mlx5/mcast/tl_mlx5_mcast_helper.c b/src/components/tl/mlx5/mcast/tl_mlx5_mcast_helper.c index b57598e5d8..1f0700bb04 100644 --- a/src/components/tl/mlx5/mcast/tl_mlx5_mcast_helper.c +++ b/src/components/tl/mlx5/mcast/tl_mlx5_mcast_helper.c @@ -480,7 +480,7 @@ ucc_status_t ucc_tl_mlx5_mcast_modify_rc_qps(ucc_tl_mlx5_mcast_coll_context_t *c IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) { tl_error(ctx->lib, "Failed to move rc qp to INIT, errno %d", errno); - goto failed; + return UCC_ERR_NO_RESOURCE; } memset(&attr, 0, sizeof(attr)); @@ -505,7 +505,7 @@ ucc_status_t ucc_tl_mlx5_mcast_modify_rc_qps(ucc_tl_mlx5_mcast_coll_context_t *c IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) { tl_error(ctx->lib, "Failed to modify rc QP index %d to RTR, errno %d", i, errno); - goto failed; + return UCC_ERR_NO_RESOURCE; } memset(&attr, 0, sizeof(attr)); @@ -520,28 +520,11 @@ ucc_status_t ucc_tl_mlx5_mcast_modify_rc_qps(ucc_tl_mlx5_mcast_coll_context_t *c IBV_QP_STATE | IBV_QP_SQ_PSN | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC)) { tl_error(ctx->lib, "Failed to modify rc QP index %i to RTS, errno %d", i, errno); - goto failed; + return UCC_ERR_NO_RESOURCE; } } return UCC_OK; - -failed: - for (i=0; icommsize; i++) { - if (ibv_destroy_qp(comm->mcast.rc_qp[i])) { - tl_error(comm->lib, "ibv_destroy_qp failed"); - return UCC_ERR_NO_RESOURCE; - } - } - - if (ibv_destroy_srq(comm->srq)) { - tl_error(comm->lib, "ibv_destroy_srq failed"); - return UCC_ERR_NO_RESOURCE; - } - - ucc_free(comm->mcast.rc_qp); - - return UCC_ERR_NO_RESOURCE; } ucc_status_t ucc_tl_mlx5_fini_mcast_group(ucc_tl_mlx5_mcast_coll_context_t *ctx, diff --git a/src/components/tl/mlx5/mcast/tl_mlx5_mcast_one_sided_reliability.c b/src/components/tl/mlx5/mcast/tl_mlx5_mcast_one_sided_reliability.c index a56791cf71..85d63a82d0 100644 --- a/src/components/tl/mlx5/mcast/tl_mlx5_mcast_one_sided_reliability.c +++ b/src/components/tl/mlx5/mcast/tl_mlx5_mcast_one_sided_reliability.c @@ -102,56 +102,93 @@ static ucc_status_t ucc_tl_mlx5_mcast_one_sided_setup_reliability_buffers(ucc_ba return status; } -ucc_status_t ucc_tl_mlx5_mcast_one_sided_reliability_init(ucc_base_team_t *team) +static inline ucc_status_t ucc_tl_mlx5_mcast_one_sided_cleanup(ucc_tl_mlx5_mcast_coll_comm_t *comm) { - ucc_tl_mlx5_team_t *tl_team = ucc_derived_of(team, ucc_tl_mlx5_team_t); - ucc_tl_mlx5_mcast_coll_comm_t *comm = tl_team->mcast->mcast_comm; - ucc_status_t status = UCC_OK; + int j; + + if (comm->mcast.rc_qp != NULL) { + for (j=0; jcommsize; j++) { + if (comm->mcast.rc_qp[j] != NULL && ibv_destroy_qp(comm->mcast.rc_qp[j])) { + tl_error(comm->lib, "ibv_destroy_qp failed"); + return UCC_ERR_NO_RESOURCE; + } + comm->mcast.rc_qp[j] = NULL; + } - status = ucc_tl_mlx5_mcast_one_sided_setup_reliability_buffers(team); - if (status != UCC_OK) { - tl_error(comm->lib, "setup reliablity resources failed"); - goto failed; + ucc_free(comm->mcast.rc_qp); + comm->mcast.rc_qp = NULL; } - /* TODO double check if ucc inplace allgather is working properly */ - status = comm->service_coll.allgather_post(comm->p2p_ctx, NULL /*inplace*/, comm->one_sided.info, - sizeof(ucc_tl_mlx5_one_sided_reliable_team_info_t), - &comm->one_sided.reliability_req); - if (UCC_OK != status) { - tl_error(comm->lib, "oob allgather failed during one-sided reliability init"); - goto failed; + if (comm->srq != NULL && ibv_destroy_srq(comm->srq)) { + tl_error(comm->lib, "ibv_destroy_srq failed"); + return UCC_ERR_NO_RESOURCE; } + comm->srq = NULL; - return status; - -failed: if (comm->one_sided.slots_mr) { ibv_dereg_mr(comm->one_sided.slots_mr); + comm->one_sided.slots_mr = 0; } if (comm->one_sided.remote_slot_info_mr) { ibv_dereg_mr(comm->one_sided.remote_slot_info_mr); + comm->one_sided.remote_slot_info_mr = 0; } if (comm->one_sided.slots_buffer) { ucc_free(comm->one_sided.slots_buffer); + comm->one_sided.slots_buffer = NULL; } if (comm->one_sided.recvd_pkts_tracker) { ucc_free(comm->one_sided.recvd_pkts_tracker); + comm->one_sided.recvd_pkts_tracker = NULL; } if (comm->one_sided.sendbuf_memkey_list) { ucc_free(comm->one_sided.sendbuf_memkey_list); + comm->one_sided.sendbuf_memkey_list = NULL; } if (comm->one_sided.remote_slot_info) { ucc_free(comm->one_sided.remote_slot_info); + comm->one_sided.remote_slot_info = NULL; } if (comm->one_sided.info) { ucc_free(comm->one_sided.info); + comm->one_sided.info = NULL; + } + + return UCC_OK; +} + +ucc_status_t ucc_tl_mlx5_mcast_one_sided_reliability_init(ucc_base_team_t *team) +{ + ucc_tl_mlx5_team_t *tl_team = ucc_derived_of(team, ucc_tl_mlx5_team_t); + ucc_tl_mlx5_mcast_coll_comm_t *comm = tl_team->mcast->mcast_comm; + ucc_status_t status = UCC_OK; + + status = ucc_tl_mlx5_mcast_one_sided_setup_reliability_buffers(team); + if (status != UCC_OK) { + tl_error(comm->lib, "setup reliablity resources failed"); + goto failed; + } + + /* TODO double check if ucc inplace allgather is working properly */ + status = comm->service_coll.allgather_post(comm->p2p_ctx, NULL /*inplace*/, comm->one_sided.info, + sizeof(ucc_tl_mlx5_one_sided_reliable_team_info_t), + &comm->one_sided.reliability_req); + if (UCC_OK != status) { + tl_error(comm->lib, "oob allgather failed during one-sided reliability init"); + goto failed; + } + + return status; + +failed: + if (UCC_OK != ucc_tl_mlx5_mcast_one_sided_cleanup(comm)) { + tl_error(comm->lib, "mcast one-sided reliablity resource cleanup failed"); } return status; @@ -182,34 +219,10 @@ ucc_status_t ucc_tl_mlx5_mcast_one_sided_reliability_test(ucc_base_team_t *team) } failed: - if (comm->one_sided.slots_mr) { - ibv_dereg_mr(comm->one_sided.slots_mr); - } - - if (comm->one_sided.remote_slot_info_mr) { - ibv_dereg_mr(comm->one_sided.remote_slot_info_mr); - } - - if (comm->one_sided.slots_buffer) { - ucc_free(comm->one_sided.slots_buffer); - } - - if (comm->one_sided.recvd_pkts_tracker) { - ucc_free(comm->one_sided.recvd_pkts_tracker); + if (UCC_OK != ucc_tl_mlx5_mcast_one_sided_cleanup(comm)) { + tl_error(comm->lib, "mcast one-sided reliablity resource cleanup failed"); } - - if (comm->one_sided.sendbuf_memkey_list) { - ucc_free(comm->one_sided.sendbuf_memkey_list); - } - - if (comm->one_sided.remote_slot_info) { - ucc_free(comm->one_sided.remote_slot_info); - } - - if (comm->one_sided.info) { - ucc_free(comm->one_sided.info); - } - + return status; }