Skip to content

Commit

Permalink
prov/efa: Decouple AV entry from endpoint
Browse files Browse the repository at this point in the history
Before this change, the EFA AV entry contained a reference to
efa_rdm_peer which is specific to a given endpoint. This member also
prevented binding a single AV to multiple endpoints.

This change removes efa_rdm_peer from AV entry  by adding a hashmap
to the endpoint that maps fi_addr to efa_rdm_peer. And it also
enables multiple EFA endpoints to bind to the same AV.

Co-authored-by: Shi Jin <sjina@amazon.com>
Signed-off-by: Sai Sunku <sunkusa@amazon.com>
  • Loading branch information
sunkuamzn and shijin-aws committed Dec 29, 2024
1 parent f893f5f commit c3f9e21
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 39 deletions.
9 changes: 8 additions & 1 deletion fabtests/pytest/efa/test_multi_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

@pytest.mark.functional
@pytest.mark.parametrize("shared_cq", [True, False])
def test_multi_ep(cmdline_args, shared_cq):
def test_multi_ep_cq(cmdline_args, shared_cq):
from common import ClientServerTest
cmd = "fi_multi_ep -e rdm"
if shared_cq:
cmd += " -Q"
test = ClientServerTest(cmdline_args, cmd)
test.run()

@pytest.mark.functional
def test_multi_ep_av(cmdline_args):
from common import ClientServerTest
cmd = "fi_multi_ep -e rdm -A"
test = ClientServerTest(cmdline_args, cmd)
test.run()
42 changes: 17 additions & 25 deletions prov/efa/src/efa_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,6 @@ void efa_ah_release(struct efa_av *av, struct efa_ah *ah)
}
}

static
void efa_conn_release(struct efa_av *av, struct efa_conn *conn);

/**
* @brief initialize the rdm related resources of an efa_conn object
*
Expand All @@ -266,18 +263,11 @@ int efa_conn_rdm_init(struct efa_av *av, struct efa_conn *conn, bool insert_shm_
int err, ret;
char smr_name[EFA_SHM_NAME_MAX];
size_t smr_name_len;
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_peer *peer;

assert(av->ep_type == FI_EP_RDM);
assert(conn->ep_addr);

/* currently multiple EP bind to same av is not supported */
assert(!dlist_empty(&av->util_av.ep_list));
efa_rdm_ep = container_of(av->util_av.ep_list.next, struct efa_rdm_ep, base_ep.util_ep.av_entry);

peer = &conn->rdm_peer;
efa_rdm_peer_construct(peer, efa_rdm_ep, conn);
conn->shm_fi_addr = FI_ADDR_NOTAVAIL;

/*
* The efa_conn_rdm_init() call can be made in two situations:
Expand Down Expand Up @@ -315,8 +305,8 @@ int efa_conn_rdm_init(struct efa_av *av, struct efa_conn *conn, bool insert_shm_
* av. The efa provider should still use peer->shm_fiaddr for transmissions
* through shm ep.
*/
peer->shm_fiaddr = conn->fi_addr;
ret = fi_av_insert(av->shm_rdm_av, smr_name, 1, &peer->shm_fiaddr, FI_AV_USER_ID, NULL);
conn->shm_fi_addr = conn->fi_addr;
ret = fi_av_insert(av->shm_rdm_av, smr_name, 1, &conn->shm_fi_addr, FI_AV_USER_ID, NULL);
if (OFI_UNLIKELY(ret != 1)) {
EFA_WARN(FI_LOG_AV,
"Failed to insert address to shm provider's av: %s\n",
Expand All @@ -326,11 +316,10 @@ int efa_conn_rdm_init(struct efa_av *av, struct efa_conn *conn, bool insert_shm_

EFA_INFO(FI_LOG_AV,
"Successfully inserted %s to shm provider's av. efa_fiaddr: %ld shm_fiaddr = %ld\n",
smr_name, conn->fi_addr, peer->shm_fiaddr);
smr_name, conn->fi_addr, conn->shm_fi_addr);

assert(peer->shm_fiaddr < efa_env.shm_av_size);
assert(conn->shm_fi_addr < efa_env.shm_av_size);
av->shm_used++;
peer->is_local = 1;
}

return 0;
Expand All @@ -350,26 +339,29 @@ void efa_conn_rdm_deinit(struct efa_av *av, struct efa_conn *conn)
int err;
struct efa_rdm_peer *peer;
struct efa_rdm_ep *ep;
struct dlist_entry *entry, *tmp;

assert(av->ep_type == FI_EP_RDM);

peer = &conn->rdm_peer;
if (peer->is_local && av->shm_rdm_av) {
err = fi_av_remove(av->shm_rdm_av, &peer->shm_fiaddr, 1, 0);
if (conn->shm_fi_addr != FI_ADDR_NOTAVAIL && av->shm_rdm_av) {
err = fi_av_remove(av->shm_rdm_av, &conn->shm_fi_addr, 1, 0);
if (err) {
EFA_WARN(FI_LOG_AV, "remove address from shm av failed! err=%d\n", err);
} else {
av->shm_used--;
assert(peer->shm_fiaddr < efa_env.shm_av_size);
assert(conn->shm_fi_addr < efa_env.shm_av_size);
}
}

/*
* We need peer->shm_fiaddr to remove shm address from shm av table,
* so efa_rdm_peer_clear must be after removing shm av table.
*/
ep = dlist_empty(&av->util_av.ep_list) ? NULL : container_of(av->util_av.ep_list.next, struct efa_rdm_ep, base_ep.util_ep.av_entry);
efa_rdm_peer_destruct(peer, ep);
dlist_foreach_safe(&av->util_av.ep_list, entry, tmp) {
ep = container_of(entry, struct efa_rdm_ep, base_ep.util_ep.av_entry);
peer = efa_rdm_peer_map_lookup(&ep->fi_addr_to_peer_map, conn->fi_addr);
if (peer) {
efa_rdm_peer_destruct(peer, ep);
efa_rdm_peer_map_remove(&ep->fi_addr_to_peer_map, conn->fi_addr, peer);
}
}
}

/*
Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/efa_av.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct efa_conn {
fi_addr_t fi_addr;
fi_addr_t util_av_fi_addr;
struct efa_rdm_peer rdm_peer;
fi_addr_t shm_fi_addr;
};

struct efa_av_entry {
Expand Down Expand Up @@ -60,7 +61,6 @@ struct efa_prv_reverse_av {
struct efa_av {
struct fid_av *shm_rdm_av;
struct efa_domain *domain;
struct efa_base_ep *base_ep;
size_t used;
size_t shm_used;
enum fi_av_type type;
Expand Down
10 changes: 0 additions & 10 deletions prov/efa/src/efa_base_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@

int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
{
/*
* Binding multiple endpoints to a single AV is currently not
* supported.
*/
if (av->base_ep) {
EFA_WARN(FI_LOG_EP_CTRL,
"Address vector already has endpoint bound to it.\n");
return -FI_ENOSYS;
}
if (base_ep->domain != av->domain) {
EFA_WARN(FI_LOG_EP_CTRL,
"Address vector doesn't belong to same domain as EP.\n");
Expand All @@ -29,7 +20,6 @@ int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
}

base_ep->av = av;
base_ep->av->base_ep = base_ep;

return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion prov/efa/src/efa_errno.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@
_(4123, WRITE_SHM_CQ_ENTRY, Failure to write CQ entry for SHM operation) \
_(4124, ESTABLISHED_RECV_UNRESP, Unresponsive receiver (connection previously established)) \
_(4125, INVALID_PKT_TYPE_ZCPY_RX, Invalid packet type received when zero copy recv mode is ON) \
_(4126, UNESTABLISHED_RECV_UNRESP, Unresponsive receiver (reachable by EFA device but handshake failed))
_(4126, UNESTABLISHED_RECV_UNRESP, Unresponsive receiver (reachable by EFA device but handshake failed)) \
_(4127, PEER_MAP_ENTRY_POOL_EXHAUSTED, Peer map entry pool exhausted)

/** @} */

Expand Down
7 changes: 7 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct efa_rdm_ep_queued_copy {
#define EFA_RDM_EP_MAX_WR_PER_IBV_POST_SEND (4096)
#define EFA_RDM_EP_MAX_WR_PER_IBV_POST_RECV (8192)

struct efa_rdm_peer_map {
struct efa_rdm_peer_map_entry *head;
};

struct efa_rdm_ep {
struct efa_base_ep base_ep;

Expand Down Expand Up @@ -187,6 +191,9 @@ struct efa_rdm_ep {
struct dlist_entry entry;
/* the count of opes queued before handshake is made with their peers */
size_t ope_queued_before_handshake_cnt;

struct ofi_bufpool *peer_map_entry_pool; /* bufpool to hold fi_addr->efa_rdm_peer key-value pairs */
struct efa_rdm_peer_map fi_addr_to_peer_map; /* Hashmap to find efa_rdm_peer given fi_addr */
};

int efa_rdm_ep_flush_queued_blocking_copy_to_hmem(struct efa_rdm_ep *ep);
Expand Down
17 changes: 17 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,18 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ret)
goto err_free;

ret = ofi_bufpool_create(&ep->peer_map_entry_pool,
sizeof(struct efa_rdm_peer_map_entry),
EFA_RDM_BUFPOOL_ALIGNMENT,
0, /* no limit to max cnt */
/* Don't track usage, because endpoint can be closed without removing entries from AV */
EFA_MIN_AV_SIZE, OFI_BUFPOOL_NO_TRACK);
if (ret)
goto err_free;

efa_rdm_rxe_map_construct(&ep->rxe_map);
efa_rdm_peer_map_construct(&ep->fi_addr_to_peer_map);

return 0;

err_free:
Expand Down Expand Up @@ -341,6 +352,9 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
if (ep->efa_tx_pkt_pool)
ofi_bufpool_destroy(ep->efa_tx_pkt_pool);

if (ep->peer_map_entry_pool)
ofi_bufpool_destroy(ep->peer_map_entry_pool);

return ret;
}

Expand Down Expand Up @@ -828,6 +842,9 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep)

if (efa_rdm_ep->rx_atomrsp_pool)
ofi_bufpool_destroy(efa_rdm_ep->rx_atomrsp_pool);

if (efa_rdm_ep->peer_map_entry_pool)
ofi_bufpool_destroy(efa_rdm_ep->peer_map_entry_pool);
}

/*
Expand Down
14 changes: 13 additions & 1 deletion prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,26 @@ struct efa_rdm_peer *efa_rdm_ep_get_peer(struct efa_rdm_ep *ep, fi_addr_t addr)
{
struct util_av_entry *util_av_entry;
struct efa_av_entry *av_entry;
struct efa_rdm_peer *peer;

if (OFI_UNLIKELY(addr == FI_ADDR_NOTAVAIL))
return NULL;

peer = efa_rdm_peer_map_lookup(&ep->fi_addr_to_peer_map, addr);
if (peer)
return peer;

util_av_entry = ofi_bufpool_get_ibuf(ep->base_ep.util_ep.av->av_entry_pool,
addr);
av_entry = (struct efa_av_entry *)util_av_entry->data;
return av_entry->conn.ep_addr ? &av_entry->conn.rdm_peer : NULL;

if (av_entry->conn.ep_addr) {
peer = efa_rdm_peer_map_insert(&ep->fi_addr_to_peer_map, addr, ep);
efa_rdm_peer_construct(peer, ep, &av_entry->conn);
return peer;
}

return NULL;
}

/**
Expand Down
40 changes: 40 additions & 0 deletions prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st
dlist_init(&peer->txe_list);
dlist_init(&peer->rxe_list);
dlist_init(&peer->overflow_pke_list);

if (conn->shm_fi_addr != FI_ADDR_NOTAVAIL) {
peer->shm_fiaddr = conn->shm_fi_addr;
peer->is_local = 1;
}
}

/**
Expand Down Expand Up @@ -111,6 +116,41 @@ void efa_rdm_peer_destruct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep)
#endif
}

struct efa_rdm_peer *efa_rdm_peer_map_insert(struct efa_rdm_peer_map *peer_map, fi_addr_t addr, struct efa_rdm_ep *ep) {
struct efa_rdm_peer_map_entry *map_entry;
struct efa_rdm_peer *peer;

map_entry = ofi_buf_alloc(ep->peer_map_entry_pool);
if (OFI_UNLIKELY(!map_entry)) {
EFA_WARN(FI_LOG_CQ,
"Map entries for EFA AV to peer mapping exhausted.\n");
efa_base_ep_write_eq_error(&ep->base_ep, FI_ENOBUFS, FI_EFA_ERR_PEER_MAP_ENTRY_POOL_EXHAUSTED);
return NULL;
}

map_entry->key = addr;
peer = &map_entry->efa_rdm_peer;

HASH_ADD(hh, peer_map->head, key, sizeof(addr), map_entry);

return peer;
}

struct efa_rdm_peer *efa_rdm_peer_map_lookup(struct efa_rdm_peer_map *peer_map, fi_addr_t addr) {
struct efa_rdm_peer_map_entry *map_entry;

HASH_FIND(hh, peer_map->head, &addr, sizeof(addr), map_entry);
return map_entry ? &map_entry->efa_rdm_peer : NULL;
}

void efa_rdm_peer_map_remove(struct efa_rdm_peer_map *peer_map, fi_addr_t addr, struct efa_rdm_peer *peer) {
struct efa_rdm_peer_map_entry *map_entry;

HASH_FIND(hh, peer_map->head, &addr, sizeof(addr), map_entry);
HASH_DEL(peer_map->head, map_entry);
ofi_buf_free(map_entry);
}

/**
* @brief run incoming packet_entry through reorder buffer
* queue the packet entry if msg_id is larger than expected.
Expand Down
18 changes: 18 additions & 0 deletions prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ struct efa_rdm_peer {
struct efa_rdm_peer_user_recv_qp user_recv_qp;
};

struct efa_rdm_peer_map_entry {
uint64_t key;
struct efa_rdm_peer efa_rdm_peer;
UT_hash_handle hh;
};

/**
* @brief check for peer's RDMA_READ support, assuming HANDSHAKE has already occurred
*
Expand Down Expand Up @@ -269,6 +275,12 @@ bool efa_both_support_zero_hdr_data_transfer(struct efa_rdm_ep *ep, struct efa_r
(peer->extra_info[0] & EFA_RDM_EXTRA_FEATURE_REQUEST_USER_RECV_QP));
}

static inline
void efa_rdm_peer_map_construct(struct efa_rdm_peer_map *peer_map)
{
peer_map->head = NULL;
}

struct efa_conn;

void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_conn *conn);
Expand All @@ -287,4 +299,10 @@ size_t efa_rdm_peer_get_runt_size(struct efa_rdm_peer *peer, struct efa_rdm_ep *

int efa_rdm_peer_select_readbase_rtm(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, struct efa_rdm_ope *ope);

struct efa_rdm_peer *efa_rdm_peer_map_insert(struct efa_rdm_peer_map *peer_map, fi_addr_t addr, struct efa_rdm_ep *ep);

struct efa_rdm_peer *efa_rdm_peer_map_lookup(struct efa_rdm_peer_map *peer_map, fi_addr_t addr);

void efa_rdm_peer_map_remove(struct efa_rdm_peer_map *peer_map, fi_addr_t addr, struct efa_rdm_peer *peer);

#endif /* EFA_RDM_PEER_H */
36 changes: 36 additions & 0 deletions prov/efa/test/efa_unit_test_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,39 @@ void test_av_insert_duplicate_gid(struct efa_resource **state)
assert_int_equal(num_addr, 1);
assert_int_not_equal(addr1, addr2);
}

/**
* @brief This test verifies that multiple endpoints can bind to the same AV
*
* @param[in] state struct efa_resource that is managed by the framework
*/
void test_av_multiple_ep(struct efa_resource **state)
{
struct efa_resource *resource = *state;
struct fid_ep *ep2, *ep3;
int ret;

/* Resource construct function creates and binds 1 EP to the AV */
efa_unit_test_resource_construct(resource, FI_EP_RDM);

/* Create and bind two new endpoints to the same AV */
fi_endpoint(resource->domain, resource->info, &ep2, NULL);
ret = fi_ep_bind(ep2, &resource->av->fid, 0);
assert_int_equal(ret, 0);

fi_endpoint(resource->domain, resource->info, &ep3, NULL);
ret = fi_ep_bind(ep3, &resource->av->fid, 0);
assert_int_equal(ret, 0);

/* Bind the two new endpoints to the same CQ and enable them */
fi_ep_bind(ep2, &resource->cq->fid, FI_SEND | FI_RECV);
ret = fi_enable(ep2);
assert_int_equal(ret, 0);

fi_ep_bind(ep3, &resource->cq->fid, FI_SEND | FI_RECV);
ret = fi_enable(ep3);
assert_int_equal(ret, 0);

fi_close(&ep2->fid);
fi_close(&ep3->fid);
}
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ int main(void)
const struct CMUnitTest efa_unit_tests[] = {
cmocka_unit_test_setup_teardown(test_av_insert_duplicate_raw_addr, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_av_insert_duplicate_gid, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_av_multiple_ep, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_device_construct_error_handling, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_ignore_missing_host_id_file, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_has_valid_host_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
Expand Down
Loading

0 comments on commit c3f9e21

Please sign in to comment.