From 9b91d9ded75e71b910d0183493858fb21fe87579 Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Thu, 28 Nov 2024 12:55:40 +0000 Subject: [PATCH] UCP/WIREUP: Support EP reconfiguration for non wired-up scenarios --- src/ucp/core/ucp_ep.c | 13 ++- src/ucp/core/ucp_ep.h | 8 ++ src/ucp/wireup/wireup.c | 109 +++++++++++++++------ test/gtest/ucp/test_ucp_ep_reconfig.cc | 127 ++++++++++++++----------- 4 files changed, 168 insertions(+), 89 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index e9f0a266055..0760b27565f 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1893,10 +1893,9 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1, config_lane2->dst_md_index); } -static ucp_lane_index_t -ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1, - ucp_lane_index_t lane1, - const ucp_ep_config_key_t *key2) +ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2) { ucp_lane_index_t lane_idx; @@ -1970,9 +1969,9 @@ void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1, } } -static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane) +int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane) { const ucp_ep_config_key_lane_t *config_lane1 = &key1->lanes[lane]; const ucp_ep_config_key_lane_t *config_lane2 = &key2->lanes[lane]; diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index d21e2153ad9..203eccde881 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -749,6 +749,10 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, ucp_lane_index_t lane2); +ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2); + void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, const ucp_ep_h ep, @@ -762,6 +766,10 @@ ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key, ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep); +int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane); + int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index c642cb4aefd..ebf5fd20241 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -1355,26 +1355,55 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps, static int ucp_wireup_check_is_reconfigurable(ucp_ep_h ep, - const ucp_ep_config_key_t *new_key, - const ucp_unpacked_address_t *remote_address, - const unsigned *addr_indices) + const ucp_ep_config_key_t *new_key) { ucp_lane_index_t lane; + const ucp_ep_config_key_t *old_key; if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) || ucp_ep_has_cm_lane(ep)) { return 1; } - /* TODO: Support reconfiguration when lanes are created without a wireup_ep - * wrapper */ - for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { - if (!ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) { + old_key = &ucp_ep_config(ep)->key; + + /* TODO: 1) Support lanes which are connected to the same remote MD, but + * different remote sys_dev (eg. TCP). */ + for (lane = 0; lane < old_key->num_lanes; ++lane) { + if ((ucp_ep_config_find_match_lane(old_key, lane, new_key) != + UCP_NULL_LANE) && + !ucp_ep_config_lane_is_equal(old_key, new_key, lane)) { return 0; } } - return 1; + /* TODO: 2) Support reconfiguration for separated wireup and AM lanes + * during wireup process (request sent). */ + return !(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED) || + (ep->am_lane == UCP_NULL_LANE) || + (old_key->wireup_msg_lane == UCP_NULL_LANE) || + (ep->am_lane == old_key->wireup_msg_lane) || + (ucp_wireup_ep_test(ucp_ep_get_lane(ep, ep->am_lane))); +} + +static int ucp_wireup_should_reconfigure(ucp_ep_h ep, + const ucp_lane_index_t *reuse_lane_map, + ucp_lane_index_t num_lanes) +{ + ucp_lane_index_t lane; + + if (ucp_ep_has_cm_lane(ep)) { + return 1; + } + + /* Check whether all lanes are reused */ + for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { + if (reuse_lane_map[lane] == UCP_NULL_LANE) { + return 1; + } + } + + return ucp_ep_num_lanes(ep) != num_lanes; } static ucs_status_t @@ -1384,15 +1413,16 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, { uct_ep_h uct_ep = NULL; ucp_lane_index_t old_lane, new_wireup_lane; - ucp_wireup_ep_t *old_wireup_ep, *new_wireup_ep; + ucp_wireup_ep_t *new_wireup_ep; + uct_ep_h old_wireup_ep, msg_ep; ucp_rsc_index_t aux_rsc_index; - int is_p2p; + int is_p2p, is_wireup_ep, is_next; ucs_status_t status; /* Get old wireup lane */ old_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST); - old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane)); - ucs_assert_always(old_wireup_ep != NULL); + old_wireup_ep = ucp_ep_get_lane(ep, old_lane); + is_wireup_ep = ucp_wireup_ep_test(old_wireup_ep); /* Select CM/non-reused lane as new wireup lane */ new_wireup_lane = ucp_ep_find_non_reused_lane(ep, key, reuse_lane_map); @@ -1417,32 +1447,31 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, } ucs_assert(new_wireup_ep != NULL); + is_next = ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep(old_wireup_ep)); /* Get correct aux_rsc_index either from next_ep or aux_ep */ - aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ? + aux_rsc_index = (!is_wireup_ep || is_next) ? ucp_ep_get_rsc_index(ep, old_lane) : - ucp_wireup_ep_get_aux_rsc_index( - &old_wireup_ep->super.super); + ucp_wireup_ep_get_aux_rsc_index(old_wireup_ep); ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE); is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key, aux_rsc_index); /* Move aux EP to new wireup lane */ - ucp_wireup_ep_set_aux(new_wireup_ep, - ucp_wireup_ep_extract_msg_ep(old_wireup_ep), + msg_ep = ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep(old_wireup_ep)); + ucp_wireup_ep_set_aux(new_wireup_ep, is_wireup_ep ? msg_ep : old_wireup_ep, aux_rsc_index, is_p2p); - /* Remove old wireup_ep as it's not needed anymore. - * NOTICE: Next two lines are intentionally not merged with the lane - * removal loop in ucp_wireup_check_config_intersect, because of future - * support for non-wireup EPs reconfiguration (which will modify this - * code). */ - uct_ep_destroy(&old_wireup_ep->super.super); - ucp_ep_set_lane(ep, old_lane, NULL); + if (is_wireup_ep) { + /* Remove old wireup_ep as it's not needed anymore. */ + uct_ep_destroy(old_wireup_ep); + } + ucp_ep_set_lane(ep, old_lane, NULL); new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super; key->wireup_msg_lane = new_wireup_lane; + return UCS_OK; } @@ -1463,8 +1492,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key, *connect_lane_bitmap = UCS_MASK(new_key->num_lanes); if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) || - !ucp_wireup_check_is_reconfigurable(ep, new_key, remote_address, - addr_indices)) { + !ucp_wireup_check_is_reconfigurable(ep, new_key)) { /* nothing to intersect with */ return ucp_ep_realloc_lanes(ep, new_key->num_lanes); } @@ -1479,6 +1507,11 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key, ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address, addr_indices, reuse_lane_map); + if (!ucp_wireup_should_reconfigure(ep, reuse_lane_map, + new_key->num_lanes)) { + return UCS_OK; + } + if (ucp_ep_has_cm_lane(ep)) { /* CM lane has to be reused by the new EP configuration */ ucs_assert(reuse_lane_map[ucp_ep_get_cm_lane(ep)] != UCP_NULL_LANE); @@ -1585,9 +1618,18 @@ ucp_wireup_gather_pending_reqs(ucp_ep_h ep, ucs_queue_head_t *replay_pending_queue) { ucp_request_t *req; + ucp_lane_index_t lane; + + ucs_queue_head_init(replay_pending_queue); + + if (ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) { + return; + } + /* Handle wireup EPs */ ucp_wireup_eps_pending_extract(ep, replay_pending_queue); + /* rkey ptr requests */ ucs_queue_for_each(req, &ep->worker->rkey_ptr_reqs, send.rndv.rkey_ptr.queue_elem) { if (req->send.ep == ep) { @@ -1595,6 +1637,17 @@ ucp_wireup_gather_pending_reqs(ucp_ep_h ep, (ucs_queue_elem_t*)&req->send.uct.priv); } } + + /* Fully connected lanes */ + for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { + if (ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) { + continue; + } + + uct_ep_pending_purge(ucp_ep_get_lane(ep, lane), + ucp_request_purge_enqueue_cb, + replay_pending_queue); + } } ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, @@ -1640,9 +1693,7 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, /* This function must be used before uct_eps are discarded. Store return * value for later use. */ - is_reconfigurable = ucp_wireup_check_is_reconfigurable(ep, &key, - remote_address, - addr_indices); + is_reconfigurable = ucp_wireup_check_is_reconfigurable(ep, &key); if (!is_reconfigurable && !ucp_ep_config_is_equal(&ucp_ep_config(ep)->key, &key)) { diff --git a/test/gtest/ucp/test_ucp_ep_reconfig.cc b/test/gtest/ucp/test_ucp_ep_reconfig.cc index dc356930fe8..d51d7ca0f6a 100644 --- a/test/gtest/ucp/test_ucp_ep_reconfig.cc +++ b/test/gtest/ucp/test_ucp_ep_reconfig.cc @@ -55,7 +55,9 @@ class test_ucp_ep_reconfig : public ucp_test { bool is_lane_connected(ucp_ep_h ep, ucp_lane_index_t lane_idx, const entity &other) const; unsigned num_paths() const; + unsigned num_shm_lanes() const; ucp_tl_bitmap_t reduced_tl_bitmap() const; + ucp_tl_bitmap_t initial_tl_bitmap() const; ucp_worker_cfg_index_t m_cfg_index = UCP_WORKER_CFG_INDEX_NULL; unsigned m_num_reused_lanes = 0; @@ -63,17 +65,11 @@ class test_ucp_ep_reconfig : public ucp_test { bool m_exclude_ifaces; }; - bool is_single_transport() + bool is_single_transport() const { return GetParam().transports.size() == 1; } - bool has_p2p_transport() - { - return has_resource(sender(), "rc_verbs") || - has_resource(sender(), "rc_mlx5"); - } - void create_entity(bool push_front, bool exclude_ifaces) { auto e = new entity(GetParam(), m_ucp_config, get_worker_params(), this, @@ -104,6 +100,12 @@ class test_ucp_ep_reconfig : public ucp_test { UCS_TEST_SKIP_R("test requires at least 2 ifaces to work"); } + if (has_transport("tcp")) { + UCS_TEST_SKIP_R("TODO: fix lane matching functionality in case " + "there's matching remote MDs and different " + "sys_devs"); + } + ensure_reused_lanes_reconfigurable(); } @@ -114,7 +116,6 @@ class test_ucp_ep_reconfig : public ucp_test { } void run(bool bidirectional = false); - void skip_non_p2p(); bool has_bond_iface(); void ensure_reused_lanes_reconfigurable(); @@ -209,9 +210,29 @@ void test_ucp_ep_reconfig::entity::store_config() m_cfg_index = ep()->cfg_index; /* Calculate number of reused lanes */ - auto num_reused = (ucp_ep_num_lanes(ep()) / 2) / num_paths(); - auto test = static_cast(m_test); - m_num_reused_lanes = test->reuse_lanes() ? num_reused : 0; + auto test = static_cast(m_test); + + if (test->reuse_lanes()) { + m_num_reused_lanes = (ucp_ep_num_lanes(ep()) / 2) / num_paths(); + } else if (!m_exclude_ifaces) { + /* In case of asymmetric config, SHM lanes are reused */ + m_num_reused_lanes = num_shm_lanes(); + } else { + m_num_reused_lanes = 0; + } +} + +unsigned test_ucp_ep_reconfig::entity::num_shm_lanes() const +{ + unsigned num_shm = 0; + + for (auto lane = 0; lane < ucp_ep_num_lanes(ep()); ++lane) { + auto rsc_idx = ucp_ep_get_rsc_index(ep(), lane); + num_shm += (ucph()->tl_rscs[rsc_idx].tl_rsc.dev_type == + UCT_DEVICE_TYPE_SHM); + } + + return num_shm; } ucp_tl_bitmap_t @@ -235,12 +256,40 @@ test_ucp_ep_reconfig::entity::ep_tl_bitmap(unsigned max_num_devs) const return tl_bitmap; } +ucp_tl_bitmap_t test_ucp_ep_reconfig::entity::initial_tl_bitmap() const +{ + auto test = static_cast(m_test); + + if (!test->is_single_transport()) { + return ucp_tl_bitmap_max; + } + + /* For single transport, half of the resources should be reserved for + * receiver side to use */ + ucp_tl_bitmap_t tl_bitmap = UCS_STATIC_BITMAP_ZERO_INITIALIZER; + size_t num_tls = 0; + ucp_rsc_index_t rsc_idx; + + UCS_STATIC_BITMAP_FOR_EACH_BIT(rsc_idx, &ucph()->tl_bitmap) { + if (++num_tls > (ucph()->num_tls / 2)) { + UCS_STATIC_BITMAP_SET(&tl_bitmap, rsc_idx); + } + } + + return tl_bitmap; +} + ucp_tl_bitmap_t test_ucp_ep_reconfig::entity::reduced_tl_bitmap() const { - if ((ep() == NULL) || !m_exclude_ifaces) { + if (!m_exclude_ifaces) { return ucp_tl_bitmap_max; } + if (ep() == NULL) { + /* Handle first entity's bitmap */ + return initial_tl_bitmap(); + } + auto reused_bitmap = ep_tl_bitmap(num_reused_lanes()); const auto negative_bitmap = UCS_STATIC_BITMAP_NOT(reused_bitmap); @@ -362,13 +411,6 @@ void test_ucp_ep_reconfig::run(bool bidirectional) r_receiver->verify_configuration(*r_sender, r_sender->num_reused_lanes()); } -void test_ucp_ep_reconfig::skip_non_p2p() -{ - if (!has_p2p_transport()) { - UCS_TEST_SKIP_R("No p2p TLs available, config will be non-wireup"); - } -} - bool test_ucp_ep_reconfig::has_bond_iface() { auto context = sender().ucph(); @@ -391,52 +433,37 @@ void test_ucp_ep_reconfig::ensure_reused_lanes_reconfigurable() return; } - if (has_transport("ud_v") || has_transport("ud_x")) { - UCS_TEST_SKIP_R("the test requires at least 2 lanes, while UD has only " - "1"); - } - - if (has_transport("tcp") || has_transport("dc_x") || has_transport("shm")) { - UCS_TEST_SKIP_R("non wired-up lanes are not supported yet"); - } - if (has_bond_iface()) { modify_config("IB_NUM_PATHS", "1", SETENV_IF_NOT_EXIST); } } -/* TODO: Remove skip condition after next PRs are merged. */ -UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, basic, !has_transport("rc")) +UCS_TEST_P(test_ucp_ep_reconfig, basic) { + if (has_transport("shm")) { + UCS_TEST_SKIP_R("TODO: add support for reconfiguration of separate " + "wireup and AM lanes"); + } + run(); } UCS_TEST_P(test_ucp_ep_reconfig, request_reset, "PROTO_REQUEST_RESET=y") { - if (is_single_transport()) { - /* One side will consume all ifaces and the other side will have no ifaces left to use */ - UCS_TEST_SKIP_R("exclude_iface requires at least 2 transports to work " - "(for example DC + SHM)"); - } - - skip_non_p2p(); run(); } -UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, resolve_remote_id, is_self(), - "MAX_RNDV_LANES=0") +UCS_TEST_P(test_ucp_ep_reconfig, resolve_remote_id) { - if (has_transport("tcp")) { - UCS_TEST_SKIP_R("asymmetric setup is not supported for this transport " - "due to a reachability issue - only matching " - "interfaces can connect"); + if (has_transport("dc_x")) { + /* Avoid creating odd number of lanes due to AM lane separation */ + modify_config("MAX_RNDV_LANES", "0"); } run(true); } UCP_INSTANTIATE_TEST_CASE(test_ucp_ep_reconfig); -UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_ep_reconfig, rc_x_v, "rc"); class test_reconfig_asymmetric : public test_ucp_ep_reconfig { protected: @@ -452,23 +479,17 @@ class test_reconfig_asymmetric : public test_ucp_ep_reconfig { } }; -/* Will be relevant when reuse + non-wireup is supported */ -UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, basic, has_transport("shm")) +UCS_TEST_P(test_reconfig_asymmetric, basic) { run(); } -/* Will be relevant when reuse + non-wireup is supported */ -UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, request_reset, - has_transport("shm"), "PROTO_REQUEST_RESET=y") +UCS_TEST_P(test_reconfig_asymmetric, request_reset, "PROTO_REQUEST_RESET=y") { - skip_non_p2p(); run(); } -/* SHM + single lane won't trigger reconfig. */ -UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, resolve_remote_id, - has_transport("shm") || is_self(), "MAX_RNDV_LANES=0") +UCS_TEST_P(test_reconfig_asymmetric, resolve_remote_id) { run(true); }