Skip to content

Commit

Permalink
UCP/WIREUP: Support EP reconfiguration for non wired-up scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
shasson5 committed Dec 10, 2024
1 parent c8ef020 commit b681b04
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 82 deletions.
13 changes: 6 additions & 7 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1893,10 +1893,9 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
config_lane2->dst_md_index);
}

static ucp_lane_index_t
ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2)
ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2)
{
ucp_lane_index_t lane_idx;

Expand Down Expand Up @@ -1970,9 +1969,9 @@ void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
}
}

static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane)
int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane)
{
const ucp_ep_config_key_lane_t *config_lane1 = &key1->lanes[lane];
const ucp_ep_config_key_lane_t *config_lane2 = &key2->lanes[lane];
Expand Down
8 changes: 8 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane2);

ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2);

void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
const ucp_ep_h ep,
Expand All @@ -762,6 +766,10 @@ ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key,

ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep);

int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane);

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);

Expand Down
110 changes: 88 additions & 22 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1353,28 +1353,69 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps,
}
}

static int
ucp_wireup_is_am_lane_replaced(ucp_ep_h ep,
const ucp_lane_index_t *reuse_lane_map)
{
return !ucp_ep_has_cm_lane(ep) && (ep->am_lane != UCP_NULL_LANE) &&
(reuse_lane_map[ep->am_lane] == UCP_NULL_LANE);
}

static int
ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
const ucp_ep_config_key_t *new_key,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices)
{
ucp_lane_index_t lane;
ucp_lane_index_t lane, wireup_lane, reuse_lane_map[UCP_MAX_LANES];
const ucp_ep_config_key_t *old_key;

if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
ucp_ep_has_cm_lane(ep)) {
return 1;
}

/* TODO: Support reconfiguration when lanes are created without a wireup_ep
* wrapper */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (!ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
old_key = &ucp_ep_config(ep)->key;

/* TODO: 1) Support lanes which are connected to the same remote MD, but
* different remote sys_dev (eg. TCP). */
for (lane = 0; lane < old_key->num_lanes; ++lane) {
if ((ucp_ep_config_find_match_lane(old_key, lane, new_key) !=
UCP_NULL_LANE) &&
!ucp_ep_config_lane_is_equal(old_key, new_key, lane)) {
return 0;
}
}

return 1;
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);
wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);

/* TODO: 2) Support reconfiguration for separated wireup and AM lanes
* during wireup process (request sent). */
return !(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED) ||
(ep->am_lane == wireup_lane) ||
!ucp_wireup_is_am_lane_replaced(ep, reuse_lane_map);
}

static int ucp_wireup_should_reconfigure(ucp_ep_h ep,
const ucp_lane_index_t *reuse_lane_map,
ucp_lane_index_t num_lanes)
{
ucp_lane_index_t lane;

if (ucp_ep_has_cm_lane(ep)) {
return 1;
}

/* Check whether all lanes are reused */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (reuse_lane_map[lane] == UCP_NULL_LANE) {
return 1;
}
}

return ucp_ep_num_lanes(ep) != num_lanes;
}

static ucs_status_t
Expand All @@ -1384,15 +1425,16 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
{
uct_ep_h uct_ep = NULL;
ucp_lane_index_t old_lane, new_wireup_lane;
ucp_wireup_ep_t *old_wireup_ep, *new_wireup_ep;
ucp_wireup_ep_t *new_wireup_ep;
uct_ep_h old_wireup_ep, msg_ep;
ucp_rsc_index_t aux_rsc_index;
int is_p2p;
int is_p2p, is_wireup_ep, is_next;
ucs_status_t status;

/* Get old wireup lane */
old_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);
old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane));
ucs_assert_always(old_wireup_ep != NULL);
old_wireup_ep = ucp_ep_get_lane(ep, old_lane);
is_wireup_ep = ucp_wireup_ep_test(old_wireup_ep);

/* Select CM/non-reused lane as new wireup lane */
new_wireup_lane = ucp_ep_find_non_reused_lane(ep, key, reuse_lane_map);
Expand All @@ -1417,32 +1459,31 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
}

ucs_assert(new_wireup_ep != NULL);
is_next = ucp_wireup_ep_is_next_ep_active(ucp_wireup_ep(old_wireup_ep));

/* Get correct aux_rsc_index either from next_ep or aux_ep */
aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ?
aux_rsc_index = (!is_wireup_ep || is_next) ?
ucp_ep_get_rsc_index(ep, old_lane) :
ucp_wireup_ep_get_aux_rsc_index(
&old_wireup_ep->super.super);
ucp_wireup_ep_get_aux_rsc_index(old_wireup_ep);

ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE);
is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key,
aux_rsc_index);

/* Move aux EP to new wireup lane */
ucp_wireup_ep_set_aux(new_wireup_ep,
ucp_wireup_ep_extract_msg_ep(old_wireup_ep),
msg_ep = ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep(old_wireup_ep));
ucp_wireup_ep_set_aux(new_wireup_ep, is_wireup_ep ? msg_ep : old_wireup_ep,
aux_rsc_index, is_p2p);

/* Remove old wireup_ep as it's not needed anymore.
* NOTICE: Next two lines are intentionally not merged with the lane
* removal loop in ucp_wireup_check_config_intersect, because of future
* support for non-wireup EPs reconfiguration (which will modify this
* code). */
uct_ep_destroy(&old_wireup_ep->super.super);
ucp_ep_set_lane(ep, old_lane, NULL);
if (is_wireup_ep) {
/* Remove old wireup_ep as it's not needed anymore. */
uct_ep_destroy(old_wireup_ep);
}

ucp_ep_set_lane(ep, old_lane, NULL);
new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super;
key->wireup_msg_lane = new_wireup_lane;

return UCS_OK;
}

Expand Down Expand Up @@ -1479,6 +1520,11 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

if (!ucp_wireup_should_reconfigure(ep, reuse_lane_map,
new_key->num_lanes)) {
return UCS_OK;
}

if (ucp_ep_has_cm_lane(ep)) {
/* CM lane has to be reused by the new EP configuration */
ucs_assert(reuse_lane_map[ucp_ep_get_cm_lane(ep)] != UCP_NULL_LANE);
Expand Down Expand Up @@ -1585,16 +1631,36 @@ ucp_wireup_gather_pending_reqs(ucp_ep_h ep,
ucs_queue_head_t *replay_pending_queue)
{
ucp_request_t *req;
ucp_lane_index_t lane;

ucs_queue_head_init(replay_pending_queue);

if (ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) {
return;
}

/* Handle wireup EPs */
ucp_wireup_eps_pending_extract(ep, replay_pending_queue);

/* rkey ptr requests */
ucs_queue_for_each(req, &ep->worker->rkey_ptr_reqs,
send.rndv.rkey_ptr.queue_elem) {
if (req->send.ep == ep) {
ucs_queue_push(replay_pending_queue,
(ucs_queue_elem_t*)&req->send.uct.priv);
}
}

/* Fully connected lanes */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
continue;
}

uct_ep_pending_purge(ucp_ep_get_lane(ep, lane),
ucp_request_purge_enqueue_cb,
replay_pending_queue);
}
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
Expand Down
Loading

0 comments on commit b681b04

Please sign in to comment.