Skip to content

Commit

Permalink
UCP/WIREUP: Support EP reconfiguration for non wired-up scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
shasson5 committed Jan 2, 2025
1 parent fa01cca commit 3eb6604
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 113 deletions.
13 changes: 6 additions & 7 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1859,10 +1859,9 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
config_lane2->dst_md_index);
}

static ucp_lane_index_t
ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2)
ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2)
{
ucp_lane_index_t lane_idx;

Expand Down Expand Up @@ -1936,9 +1935,9 @@ void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
}
}

static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane)
int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane)
{
const ucp_ep_config_key_lane_t *config_lane1 = &key1->lanes[lane];
const ucp_ep_config_key_lane_t *config_lane2 = &key2->lanes[lane];
Expand Down
8 changes: 8 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,13 +749,21 @@ int ucp_ep_config_lane_is_peer_match(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane2);

ucp_lane_index_t ucp_ep_config_find_match_lane(const ucp_ep_config_key_t *key1,
ucp_lane_index_t lane1,
const ucp_ep_config_key_t *key2);

void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
const ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices,
ucp_lane_index_t *lane_map);

int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane);

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);

Expand Down
186 changes: 135 additions & 51 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,15 @@ ucp_wireup_process_pre_request(ucp_worker_h worker, ucp_ep_h ep,
ucp_ep_set_failed_schedule(ep, UCP_NULL_LANE, status);
}

static int ucp_wireup_full_handshake_required(ucp_ep_h ep, uint8_t msg_type)
{
ucp_lane_index_t lane = ucp_wireup_get_msg_lane(ep, msg_type);
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, lane));

return (wireup_ep != NULL) &&
(wireup_ep->flags & UCP_WIREUP_EP_FLAG_FLUSH_REQUIRED);
}

static UCS_F_NOINLINE void
ucp_wireup_process_request(ucp_worker_h worker, ucp_ep_h ep,
const ucp_wireup_msg_t *msg,
Expand Down Expand Up @@ -697,7 +706,8 @@ ucp_wireup_process_request(ucp_worker_h worker, ucp_ep_h ep,
send_reply = /* Always send the reply in case of CM, the client's EP has to
* be marked as REMOTE_CONNECTED */
has_cm_lane || (msg->dst_ep_id == UCS_PTR_MAP_KEY_INVALID) ||
ucp_ep_config(ep)->p2p_lanes;
ucp_ep_config(ep)->p2p_lanes ||
ucp_wireup_full_handshake_required(ep, msg->type);

/* Connect p2p addresses to remote endpoint, if at least one is true: */
if (/* - EP has not been connected locally yet */
Expand All @@ -720,7 +730,8 @@ ucp_wireup_process_request(ucp_worker_h worker, ucp_ep_h ep,
/* don't mark as connected to remote now in case of CM, since it destroys
* CM wireup EP (if it is hidden in the CM lane) that is used for sending
* WIREUP MSGs */
if (!ucp_ep_config(ep)->p2p_lanes && !has_cm_lane) {
if (!ucp_ep_config(ep)->p2p_lanes && !has_cm_lane &&
!ucp_wireup_full_handshake_required(ep, msg->type)) {
/* mark the endpoint as connected to remote */
ucp_wireup_remote_connected(ep);
}
Expand Down Expand Up @@ -760,7 +771,6 @@ ucp_wireup_process_reply(ucp_worker_h worker, ucp_ep_h ep,
const ucp_unpacked_address_t *remote_address)
{
ucs_status_t status;
int ack;

UCP_WIREUP_MSG_CHECK(msg, ep, UCP_WIREUP_MSG_REPLY);
ucs_trace("ep %p: got wireup reply src_ep_id 0x%"PRIx64
Expand All @@ -786,19 +796,14 @@ ucp_wireup_process_reply(ucp_worker_h worker, ucp_ep_h ep,
}

ucp_ep_update_flags(ep, UCP_EP_FLAG_LOCAL_CONNECTED, 0);
ack = 1;
} else {
ack = 0;
}

ucp_wireup_remote_connected(ep);

if (ack) {
/* Send `UCP_WIREUP_MSG_ACK` from progress function
* to avoid calling UCT routines from an async thread */
ucs_callbackq_add_oneshot(&worker->uct->progress_q, ep,
ucp_wireup_send_msg_ack, ep);
}
/* Send `UCP_WIREUP_MSG_ACK` from progress function
* to avoid calling UCT routines from an async thread */
ucs_callbackq_add_oneshot(&worker->uct->progress_q, ep,
ucp_wireup_send_msg_ack, ep);
}

static void ucp_ep_removed_flush_completion(ucp_request_t *req)
Expand Down Expand Up @@ -1351,28 +1356,70 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps,
}
}

static int
ucp_wireup_is_am_lane_replaced(ucp_ep_h ep,
const ucp_lane_index_t *reuse_lane_map)
{
return !ucp_ep_has_cm_lane(ep) && (ep->am_lane != UCP_NULL_LANE) &&
(reuse_lane_map[ep->am_lane] == UCP_NULL_LANE) &&
!ucp_wireup_ep_test(ucp_ep_get_lane(ep, ep->am_lane));
}

static int
ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
const ucp_ep_config_key_t *new_key,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices)
{
ucp_lane_index_t lane;
ucp_lane_index_t lane, wireup_lane, reuse_lane_map[UCP_MAX_LANES];
const ucp_ep_config_key_t *old_key;

if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
ucp_ep_has_cm_lane(ep)) {
return 1;
}

/* TODO: Support reconfiguration when lanes are created without a wireup_ep
* wrapper */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (!ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
old_key = &ucp_ep_config(ep)->key;

/* TODO: 1) Support lanes which are connected to the same remote MD, but
* different remote sys_dev (eg. TCP). */
for (lane = 0; lane < old_key->num_lanes; ++lane) {
if ((ucp_ep_config_find_match_lane(old_key, lane, new_key) !=
UCP_NULL_LANE) &&
!ucp_ep_config_lane_is_equal(old_key, new_key, lane)) {
return 0;
}
}

return 1;
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);
wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);

/* TODO: 2) Support reconfiguration for separated wireup and AM lanes
* during wireup process (request sent). */
return !(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED) ||
(ep->am_lane == wireup_lane) ||
!ucp_wireup_is_am_lane_replaced(ep, reuse_lane_map);
}

static int ucp_wireup_should_reconfigure(ucp_ep_h ep,
const ucp_lane_index_t *reuse_lane_map,
ucp_lane_index_t num_lanes)
{
ucp_lane_index_t lane;

if (ucp_ep_has_cm_lane(ep)) {
return 1;
}

/* Check whether all lanes are reused */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (reuse_lane_map[lane] == UCP_NULL_LANE) {
return 1;
}
}

return ucp_ep_num_lanes(ep) != num_lanes;
}

static ucp_lane_index_t
Expand Down Expand Up @@ -1414,72 +1461,80 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
const ucp_lane_index_t *reuse_lane_map)
{
uct_ep_h uct_ep = NULL;
ucp_lane_index_t old_lane, new_wireup_lane;
ucp_wireup_ep_t *old_wireup_ep, *new_wireup_ep;
int am_replaced = ucp_wireup_is_am_lane_replaced(ep, reuse_lane_map);
ucp_lane_index_t old_lane, new_wireup_lane, old_wireup_lane;
ucp_lane_index_t non_reused_lane;
ucp_wireup_ep_t *new_wireup_ep;
uct_ep_h old_wireup_ep, msg_ep;
ucp_rsc_index_t aux_rsc_index;
int is_p2p;
int is_p2p, is_wireup_ep, is_next_active;
ucs_status_t status;

/* Get old wireup lane */
old_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);
old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane));
ucs_assert_always(old_wireup_ep != NULL);
old_wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);
old_lane = am_replaced ? ep->am_lane : old_wireup_lane;
old_wireup_ep = ucp_ep_get_lane(ep, old_lane);
is_wireup_ep = ucp_wireup_ep_test(old_wireup_ep);

/* If wireup lane is required for new configuration, select it according
* to the following priority:
* 1) If CM lane exists, use it.
* 2) If any reused wireup_ep lane exists in old configuration, use it.
* 3) Otherwise select a non-reused lane (which must exist), and wrap
* it with a new wireup_ep wrapper. */

* 2) If old AM lane is replaced, use new AM lane.
* 3) If any non-reused lane exists, select it and wrap with a new
* wireup_ep wrapper.
* 4) Otherwise select a reused wireup_ep lane (which must exist). */
if (ucp_ep_has_cm_lane(ep)) {
new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
new_wireup_lane = key->cm_lane;
} else {
new_wireup_lane = ucp_wireup_find_reused_wireup_ep_lane(ep,
reuse_lane_map);
if (new_wireup_lane != UCP_NULL_LANE) {
uct_ep = ucp_ep_get_lane(ep, new_wireup_lane);
} else {
new_wireup_lane = ucp_wireup_find_non_reused_lane(ep, key,
reuse_lane_map);
ucs_assert(new_wireup_lane != UCP_NULL_LANE);
non_reused_lane = ucp_wireup_find_non_reused_lane(ep, key,
reuse_lane_map);
new_wireup_lane = am_replaced ? key->am_lane : non_reused_lane;

if (new_wireup_lane != UCP_NULL_LANE) {
status = ucp_wireup_ep_create(ep, &uct_ep);
if (status != UCS_OK) {
return status;
}
} else {
new_wireup_lane =
ucp_wireup_find_reused_wireup_ep_lane(ep, reuse_lane_map);
ucs_assert(new_wireup_lane != UCP_NULL_LANE);
uct_ep = ucp_ep_get_lane(ep, new_wireup_lane);
}
new_wireup_ep = ucp_wireup_ep(uct_ep);
}

ucs_assert(new_wireup_ep != NULL);

/* Get correct aux_rsc_index either from next_ep or aux_ep */
aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ?
ucp_ep_get_rsc_index(ep, old_lane) :
ucp_wireup_ep_get_aux_rsc_index(
&old_wireup_ep->super.super);
is_next_active = ucp_wireup_ep_is_next_ep_active(
ucp_wireup_ep(old_wireup_ep));
aux_rsc_index = (!is_wireup_ep || is_next_active) ?
ucp_ep_get_rsc_index(ep, old_lane) :
ucp_wireup_ep_get_aux_rsc_index(old_wireup_ep);

ucs_assert(aux_rsc_index != UCP_NULL_RESOURCE);
is_p2p = ucp_ep_config_connect_p2p(ep->worker, &ucp_ep_config(ep)->key,
aux_rsc_index);

/* Move aux EP to new wireup lane */
ucp_wireup_ep_set_aux(new_wireup_ep,
ucp_wireup_ep_extract_msg_ep(old_wireup_ep),
msg_ep = ucp_wireup_ep_extract_msg_ep(ucp_wireup_ep(old_wireup_ep));
ucp_wireup_ep_set_aux(new_wireup_ep, is_wireup_ep ? msg_ep : old_wireup_ep,
aux_rsc_index, is_p2p);

/* Remove old wireup_ep as it's not needed anymore.
* NOTICE: Next two lines are intentionally not merged with the lane
* removal loop in ucp_wireup_check_config_intersect, because of future
* support for non-wireup EPs reconfiguration (which will modify this
* code). */
uct_ep_destroy(&old_wireup_ep->super.super);
ucp_ep_set_lane(ep, old_lane, NULL);
if (is_wireup_ep) {
/* Remove old wireup_ep as it's not needed anymore. */
uct_ep_destroy(old_wireup_ep);
}

ucp_ep_set_lane(ep, old_lane, NULL);
new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super;
key->wireup_msg_lane = new_wireup_lane;

if (am_replaced) {
new_wireup_ep->flags |= UCP_WIREUP_EP_FLAG_FLUSH_REQUIRED;
}

return UCS_OK;
}

Expand All @@ -1496,6 +1551,7 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucp_lane_index_t lane, reuse_lane, wireup_lane;
uct_ep_h uct_ep;
ucs_status_t status;
int am_replaced;

*connect_lane_bitmap = UCS_MASK(new_key->num_lanes);

Expand All @@ -1516,6 +1572,11 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

if (!ucp_wireup_should_reconfigure(ep, reuse_lane_map,
new_key->num_lanes)) {
return UCS_OK;
}

if (ucp_ep_has_cm_lane(ep)) {
/* CM lane has to be reused by the new EP configuration */
ucs_assert(reuse_lane_map[ucp_ep_get_cm_lane(ep)] != UCP_NULL_LANE);
Expand All @@ -1525,7 +1586,10 @@ ucp_wireup_check_config_intersect(ucp_ep_h ep, ucp_ep_config_key_t *new_key,
"new_key->wireup_msg_lane=%u", new_key->wireup_msg_lane);
}

wireup_lane = ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);
am_replaced = ucp_wireup_is_am_lane_replaced(ep, reuse_lane_map);
wireup_lane = am_replaced ?
ep->am_lane :
ucp_wireup_get_msg_lane(ep, UCP_WIREUP_MSG_REQUEST);

/* wireup lane has to be selected for the old configuration */
ucs_assert(wireup_lane != UCP_NULL_LANE);
Expand Down Expand Up @@ -1622,16 +1686,36 @@ ucp_wireup_gather_pending_reqs(ucp_ep_h ep,
ucs_queue_head_t *replay_pending_queue)
{
ucp_request_t *req;
ucp_lane_index_t lane;

ucs_queue_head_init(replay_pending_queue);

if (ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) {
return;
}

/* Handle wireup EPs */
ucp_wireup_eps_pending_extract(ep, replay_pending_queue);

/* rkey ptr requests */
ucs_queue_for_each(req, &ep->worker->rkey_ptr_reqs,
send.rndv.rkey_ptr.queue_elem) {
if (req->send.ep == ep) {
ucs_queue_push(replay_pending_queue,
(ucs_queue_elem_t*)&req->send.uct.priv);
}
}

/* Fully connected lanes */
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
continue;
}

uct_ep_pending_purge(ucp_ep_get_lane(ep, lane),
ucp_request_purge_enqueue_cb,
replay_pending_queue);
}
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/wireup/wireup_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ enum {
UCP_WIREUP_EP_FLAG_SEND_CLIENT_ID = UCS_BIT(3),

/* Indicates that aux_ep is CONNECT_TO_EP */
UCP_WIREUP_EP_FLAG_AUX_P2P = UCS_BIT(4)
UCP_WIREUP_EP_FLAG_AUX_P2P = UCS_BIT(4),

/* Flush outstanding messages */
UCP_WIREUP_EP_FLAG_FLUSH_REQUIRED = UCS_BIT(5)
};


Expand Down
Loading

0 comments on commit 3eb6604

Please sign in to comment.