Skip to content

Commit

Permalink
prov/efa: Make efa_rdm_cq use efa_cq
Browse files Browse the repository at this point in the history
The structs efa_rdm_cq and efa_cq share the
same members util_cq and ibv_cq. This patch makes efa_rdm_cq
use efa_cq as a subset so we can convert between efa_rdm_cq and efa_cq
via container_of, like efa_rdm_ep and efa_base_ep.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Jan 14, 2025
1 parent 628c65a commit 815a166
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 127 deletions.
11 changes: 11 additions & 0 deletions prov/efa/src/efa_base_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <sys/time.h>
#include "efa.h"
#include "efa_av.h"
#include "efa_cq.h"
#include "rdm/efa_rdm_protocol.h"

int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
Expand Down Expand Up @@ -520,3 +521,13 @@ const char *efa_base_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep, fi_ad
{
return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_base_ep_get_peer_raw_addr(base_ep, addr));
}

struct efa_cq *efa_base_ep_get_tx_cq(struct efa_base_ep *ep)
{
return ep->util_ep.tx_cq ? container_of(ep->util_ep.tx_cq, struct efa_cq, util_cq) : NULL;
}

struct efa_cq *efa_base_ep_get_rx_cq(struct efa_base_ep *ep)
{
return ep->util_ep.rx_cq ? container_of(ep->util_ep.rx_cq, struct efa_cq, util_cq) : NULL;
}
5 changes: 5 additions & 0 deletions prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ struct efa_ep_addr *efa_base_ep_get_peer_raw_addr(struct efa_base_ep *base_ep,
const char *efa_base_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep,
fi_addr_t addr, char *buf,
size_t *buflen);

struct efa_cq *efa_base_ep_get_tx_cq(struct efa_base_ep *ep);

struct efa_cq *efa_base_ep_get_rx_cq(struct efa_base_ep *ep);

#endif
42 changes: 21 additions & 21 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ int efa_rdm_cq_close(struct fid *fid)

retv = 0;

cq = container_of(fid, struct efa_rdm_cq, util_cq.cq_fid.fid);
cq = container_of(fid, struct efa_rdm_cq, efa_cq.util_cq.cq_fid.fid);

if (cq->ibv_cq.ibv_cq_ex) {
ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq.ibv_cq_ex));
if (cq->efa_cq.ibv_cq.ibv_cq_ex) {
ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->efa_cq.ibv_cq.ibv_cq_ex));
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n",
fi_strerror(-ret));
return ret;
}
cq->ibv_cq.ibv_cq_ex = NULL;
cq->efa_cq.ibv_cq.ibv_cq_ex = NULL;
}

if (cq->shm_cq) {
Expand All @@ -56,7 +56,7 @@ int efa_rdm_cq_close(struct fid *fid)
}
}

ret = ofi_cq_cleanup(&cq->util_cq);
ret = ofi_cq_cleanup(&cq->efa_cq.util_cq);
if (ret)
return ret;
free(cq);
Expand Down Expand Up @@ -435,13 +435,13 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
int prov_errno;
struct efa_rdm_ep *ep = NULL;
struct fi_cq_err_entry err_entry;
struct efa_rdm_cq *efa_rdm_cq;
struct efa_cq *efa_cq;
struct efa_domain *efa_domain;
struct efa_qp *qp;
struct dlist_entry rx_progressed_ep_list, *tmp;

efa_rdm_cq = container_of(ibv_cq, struct efa_rdm_cq, ibv_cq);
efa_domain = container_of(efa_rdm_cq->util_cq.domain, struct efa_domain, util_domain);
efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain);
dlist_init(&rx_progressed_ep_list);

/* Call ibv_start_poll only once */
Expand Down Expand Up @@ -538,7 +538,7 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
.prov_errno = prov_errno,
.op_context = NULL
};
ofi_cq_write_error(&efa_rdm_cq->util_cq, &err_entry);
ofi_cq_write_error(&efa_cq->util_cq, &err_entry);
}

if (should_end_poll)
Expand All @@ -559,9 +559,9 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun
ssize_t ret;
struct efa_domain *domain;

cq = container_of(cq_fid, struct efa_rdm_cq, util_cq.cq_fid.fid);
cq = container_of(cq_fid, struct efa_rdm_cq, efa_cq.util_cq.cq_fid.fid);

domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);
domain = container_of(cq->efa_cq.util_cq.domain, struct efa_domain, util_domain);

ofi_genlock_lock(&domain->srx_lock);

Expand All @@ -573,13 +573,13 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun
* completion to efa. Use ofi_cq_read_entries to get the number of
* shm completions without progressing efa ep again.
*/
ret = ofi_cq_read_entries(&cq->util_cq, buf, count, src_addr);
ret = ofi_cq_read_entries(&cq->efa_cq.util_cq, buf, count, src_addr);

if (ret > 0)
goto out;
}

ret = ofi_cq_readfrom(&cq->util_cq.cq_fid, buf, count, src_addr);
ret = ofi_cq_readfrom(&cq->efa_cq.util_cq.cq_fid, buf, count, src_addr);

out:
ofi_genlock_unlock(&domain->srx_lock);
Expand Down Expand Up @@ -608,8 +608,8 @@ static void efa_rdm_cq_progress(struct util_cq *cq)
struct fid_list_entry *fid_entry;

ofi_genlock_lock(&cq->ep_list_lock);
efa_rdm_cq = container_of(cq, struct efa_rdm_cq, util_cq);
efa_domain = container_of(efa_rdm_cq->util_cq.domain, struct efa_domain, util_domain);
efa_rdm_cq = container_of(cq, struct efa_rdm_cq, efa_cq.util_cq);
efa_domain = container_of(efa_rdm_cq->efa_cq.util_cq.domain, struct efa_domain, util_domain);

/**
* TODO: It's better to just post the initial batch of internal rx pkts during ep enable
Expand Down Expand Up @@ -671,19 +671,19 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,

dlist_init(&cq->ibv_cq_poll_list);
cq->need_to_scan_ep_list = false;
ret = ofi_cq_init(&efa_prov, domain, attr, &cq->util_cq,
ret = ofi_cq_init(&efa_prov, domain, attr, &cq->efa_cq.util_cq,
&efa_rdm_cq_progress, context);

if (ret)
goto free;

ret = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx, &cq->ibv_cq.ibv_cq_ex, &cq->ibv_cq.ibv_cq_ex_type);
ret = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx, &cq->efa_cq.ibv_cq.ibv_cq_ex, &cq->efa_cq.ibv_cq.ibv_cq_ex_type);
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", fi_strerror(ret));
goto close_util_cq;
}

*cq_fid = &cq->util_cq.cq_fid;
*cq_fid = &cq->efa_cq.util_cq.cq_fid;
(*cq_fid)->fid.ops = &efa_rdm_cq_fi_ops;
(*cq_fid)->ops = &efa_rdm_cq_ops;

Expand All @@ -693,7 +693,7 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
/* Bind ep with shm provider's cq */
shm_cq_attr.flags |= FI_PEER;
peer_cq_context.size = sizeof(peer_cq_context);
peer_cq_context.cq = cq->util_cq.peer_cq;
peer_cq_context.cq = cq->efa_cq.util_cq.peer_cq;
ret = fi_cq_open(efa_domain->shm_domain, &shm_cq_attr,
&cq->shm_cq, &peer_cq_context);
if (ret) {
Expand All @@ -704,12 +704,12 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,

return 0;
destroy_ibv_cq:
retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq.ibv_cq_ex));
retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->efa_cq.ibv_cq.ibv_cq_ex));
if (retv)
EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n",
fi_strerror(-retv));
close_util_cq:
retv = ofi_cq_cleanup(&cq->util_cq);
retv = ofi_cq_cleanup(&cq->efa_cq.util_cq);
if (retv)
EFA_WARN(FI_LOG_CQ, "Unable to close util cq: %s\n",
fi_strerror(-retv));
Expand Down
3 changes: 1 addition & 2 deletions prov/efa/src/rdm/efa_rdm_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
#include <ofi_util.h>

struct efa_rdm_cq {
struct util_cq util_cq;
struct efa_ibv_cq ibv_cq;
struct efa_cq efa_cq;
struct fid_cq *shm_cq;
struct dlist_entry ibv_cq_poll_list;
bool need_to_scan_ep_list;
Expand Down
74 changes: 37 additions & 37 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ void efa_rdm_ep_construct_ibv_qp_init_attr_ex(struct efa_rdm_ep *ep,
static inline
struct efa_rdm_cq *efa_rdm_ep_get_tx_rdm_cq(struct efa_rdm_ep *ep)
{
return ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_rdm_cq, util_cq) : NULL;
return ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_rdm_cq, efa_cq.util_cq) : NULL;
}

static inline
struct efa_rdm_cq *efa_rdm_ep_get_rx_rdm_cq(struct efa_rdm_ep *ep)
{
return ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_rdm_cq, util_cq) : NULL;
return ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_rdm_cq, efa_cq.util_cq) : NULL;
}

/**
Expand All @@ -58,33 +58,33 @@ static
int efa_rdm_ep_create_base_ep_ibv_qp(struct efa_rdm_ep *ep)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };
struct efa_rdm_cq *tx_rdm_cq, *rx_rdm_cq;
struct efa_cq *tx_cq, *rx_cq;
struct ibv_cq_ex *tx_ibv_cq, *rx_ibv_cq;
int ret;

tx_rdm_cq = efa_rdm_ep_get_tx_rdm_cq(ep);
rx_rdm_cq = efa_rdm_ep_get_rx_rdm_cq(ep);
tx_cq = efa_base_ep_get_tx_cq(&ep->base_ep);
rx_cq = efa_base_ep_get_rx_cq(&ep->base_ep);

if (!tx_rdm_cq && !rx_rdm_cq) {
if (!tx_cq && !rx_cq) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send or receive completion queue\n");
return -FI_ENOCQ;
}

if (!tx_rdm_cq && ofi_needs_tx(ep->base_ep.info->caps)) {
if (!tx_cq && ofi_needs_tx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
return -FI_ENOCQ;
}

if (!rx_rdm_cq && ofi_needs_rx(ep->base_ep.info->caps)) {
if (!rx_cq && ofi_needs_rx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled (FI_RECV).\n");
return -FI_ENOCQ;
}

tx_ibv_cq = tx_rdm_cq ? tx_rdm_cq->ibv_cq.ibv_cq_ex : rx_rdm_cq->ibv_cq.ibv_cq_ex;
rx_ibv_cq = rx_rdm_cq ? rx_rdm_cq->ibv_cq.ibv_cq_ex : tx_rdm_cq->ibv_cq.ibv_cq_ex;
tx_ibv_cq = tx_cq ? tx_cq->ibv_cq.ibv_cq_ex : rx_cq->ibv_cq.ibv_cq_ex;
rx_ibv_cq = rx_cq ? rx_cq->ibv_cq.ibv_cq_ex : tx_cq->ibv_cq.ibv_cq_ex;

efa_rdm_ep_construct_ibv_qp_init_attr_ex(ep, &attr_ex, tx_ibv_cq, rx_ibv_cq);

Expand Down Expand Up @@ -699,9 +699,9 @@ static int efa_rdm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
}
break;
case FI_CLASS_CQ:
cq = container_of(bfid, struct efa_rdm_cq, util_cq.cq_fid.fid);
cq = container_of(bfid, struct efa_rdm_cq, efa_cq.util_cq.cq_fid.fid);

ret = ofi_ep_bind_cq(&efa_rdm_ep->base_ep.util_ep, &cq->util_cq, flags);
ret = ofi_ep_bind_cq(&efa_rdm_ep->base_ep.util_ep, &cq->efa_cq.util_cq, flags);
if (ret)
return ret;

Expand Down Expand Up @@ -873,12 +873,12 @@ bool efa_rdm_ep_has_unfinished_send(struct efa_rdm_ep *efa_rdm_ep)
static inline
void efa_rdm_ep_wait_send(struct efa_rdm_ep *efa_rdm_ep)
{
struct efa_rdm_cq *tx_cq, *rx_cq;
struct efa_cq *tx_cq, *rx_cq;

ofi_genlock_lock(&efa_rdm_ep_domain(efa_rdm_ep)->srx_lock);

tx_cq = efa_rdm_ep_get_tx_rdm_cq(efa_rdm_ep);
rx_cq = efa_rdm_ep_get_rx_rdm_cq(efa_rdm_ep);
tx_cq = efa_base_ep_get_tx_cq(&efa_rdm_ep->base_ep);
rx_cq = efa_base_ep_get_rx_cq(&efa_rdm_ep->base_ep);

while (efa_rdm_ep_has_unfinished_send(efa_rdm_ep)) {
/* poll cq until empty */
Expand All @@ -898,10 +898,10 @@ void efa_rdm_ep_remove_cntr_ibv_cq_poll_list(struct efa_rdm_ep *ep)
int i;
struct efa_cntr *efa_cntr;
struct util_cntr *util_cntr;
struct efa_rdm_cq *tx_cq, *rx_cq;
struct efa_cq *tx_cq, *rx_cq;

tx_cq = efa_rdm_ep_get_tx_rdm_cq(ep);
rx_cq = efa_rdm_ep_get_rx_rdm_cq(ep);
tx_cq = efa_base_ep_get_tx_cq(&ep->base_ep);
rx_cq = efa_base_ep_get_rx_cq(&ep->base_ep);

for (i = 0; i< CNTR_CNT; i++) {
util_cntr = ep->base_ep.util_ep.cntrs[i];
Expand All @@ -928,16 +928,16 @@ void efa_rdm_ep_remove_cq_ibv_cq_poll_list(struct efa_rdm_ep *ep)
* It must happen after ofi_endpoint_close
* so we have cq's reference counters updated.
*/
if (tx_cq && !ofi_atomic_get32(&tx_cq->util_cq.ref)) {
efa_ibv_cq_poll_list_remove(&tx_cq->ibv_cq_poll_list, &tx_cq->util_cq.ep_list_lock, &tx_cq->ibv_cq);
if (tx_cq && !ofi_atomic_get32(&tx_cq->efa_cq.util_cq.ref)) {
efa_ibv_cq_poll_list_remove(&tx_cq->ibv_cq_poll_list, &tx_cq->efa_cq.util_cq.ep_list_lock, &tx_cq->efa_cq.ibv_cq);
if (rx_cq)
efa_ibv_cq_poll_list_remove(&rx_cq->ibv_cq_poll_list, &rx_cq->util_cq.ep_list_lock, &tx_cq->ibv_cq);
efa_ibv_cq_poll_list_remove(&rx_cq->ibv_cq_poll_list, &rx_cq->efa_cq.util_cq.ep_list_lock, &tx_cq->efa_cq.ibv_cq);
}

if (rx_cq && !ofi_atomic_get32(&rx_cq->util_cq.ref)) {
efa_ibv_cq_poll_list_remove(&rx_cq->ibv_cq_poll_list, &rx_cq->util_cq.ep_list_lock, &rx_cq->ibv_cq);
if (rx_cq && !ofi_atomic_get32(&rx_cq->efa_cq.util_cq.ref)) {
efa_ibv_cq_poll_list_remove(&rx_cq->ibv_cq_poll_list, &rx_cq->efa_cq.util_cq.ep_list_lock, &rx_cq->efa_cq.ibv_cq);
if (tx_cq)
efa_ibv_cq_poll_list_remove(&tx_cq->ibv_cq_poll_list, &tx_cq->util_cq.ep_list_lock, &rx_cq->ibv_cq);
efa_ibv_cq_poll_list_remove(&tx_cq->ibv_cq_poll_list, &tx_cq->efa_cq.util_cq.ep_list_lock, &rx_cq->efa_cq.ibv_cq);
}
}

Expand Down Expand Up @@ -1099,15 +1099,15 @@ static void efa_rdm_ep_close_shm_resources(struct efa_rdm_ep *efa_rdm_ep)
efa_av->shm_rdm_av = NULL;
}

efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.tx_cq, struct efa_rdm_cq, util_cq);
efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.tx_cq, struct efa_rdm_cq, efa_cq.util_cq);
if (efa_rdm_cq->shm_cq) {
ret = fi_close(&efa_rdm_cq->shm_cq->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm cq\n");
efa_rdm_cq->shm_cq = NULL;
}

efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.rx_cq, struct efa_rdm_cq, util_cq);
efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.rx_cq, struct efa_rdm_cq, efa_cq.util_cq);
if (efa_rdm_cq->shm_cq) {
ret = fi_close(&efa_rdm_cq->shm_cq->fid);
if (ret)
Expand Down Expand Up @@ -1187,9 +1187,9 @@ int efa_rdm_ep_insert_cntr_ibv_cq_poll_list(struct efa_rdm_ep *ep)
int i, ret;
struct efa_cntr *efa_cntr;
struct util_cntr *util_cntr;
struct efa_rdm_cq *tx_cq, *rx_cq;
tx_cq = efa_rdm_ep_get_tx_rdm_cq(ep);
rx_cq = efa_rdm_ep_get_rx_rdm_cq(ep);
struct efa_cq *tx_cq, *rx_cq;
tx_cq = efa_base_ep_get_tx_cq(&ep->base_ep);
rx_cq = efa_base_ep_get_rx_cq(&ep->base_ep);

for (i = 0; i < CNTR_CNT; i++) {
util_cntr = ep->base_ep.util_ep.cntrs[i];
Expand Down Expand Up @@ -1224,33 +1224,33 @@ int efa_rdm_ep_insert_cq_ibv_cq_poll_list(struct efa_rdm_ep *ep)
rx_cq = efa_rdm_ep_get_rx_rdm_cq(ep);

if (tx_cq) {
ret = efa_ibv_cq_poll_list_insert(&tx_cq->ibv_cq_poll_list, &tx_cq->util_cq.ep_list_lock, &tx_cq->ibv_cq);
ret = efa_ibv_cq_poll_list_insert(&tx_cq->ibv_cq_poll_list, &tx_cq->efa_cq.util_cq.ep_list_lock, &tx_cq->efa_cq.ibv_cq);
if (ret)
return ret;

if (rx_cq) {
ret = efa_ibv_cq_poll_list_insert(&tx_cq->ibv_cq_poll_list, &tx_cq->util_cq.ep_list_lock, &rx_cq->ibv_cq);
ret = efa_ibv_cq_poll_list_insert(&tx_cq->ibv_cq_poll_list, &tx_cq->efa_cq.util_cq.ep_list_lock, &rx_cq->efa_cq.ibv_cq);
if (ret)
return ret;
}
ofi_genlock_lock(&tx_cq->util_cq.ep_list_lock);
ofi_genlock_lock(&tx_cq->efa_cq.util_cq.ep_list_lock);
tx_cq->need_to_scan_ep_list = true;
ofi_genlock_unlock(&tx_cq->util_cq.ep_list_lock);
ofi_genlock_unlock(&tx_cq->efa_cq.util_cq.ep_list_lock);
}

if (rx_cq) {
ret = efa_ibv_cq_poll_list_insert(&rx_cq->ibv_cq_poll_list, &rx_cq->util_cq.ep_list_lock, &rx_cq->ibv_cq);
ret = efa_ibv_cq_poll_list_insert(&rx_cq->ibv_cq_poll_list, &rx_cq->efa_cq.util_cq.ep_list_lock, &rx_cq->efa_cq.ibv_cq);
if (ret)
return ret;

if (tx_cq) {
ret = efa_ibv_cq_poll_list_insert(&rx_cq->ibv_cq_poll_list, &rx_cq->util_cq.ep_list_lock, &tx_cq->ibv_cq);
ret = efa_ibv_cq_poll_list_insert(&rx_cq->ibv_cq_poll_list, &rx_cq->efa_cq.util_cq.ep_list_lock, &tx_cq->efa_cq.ibv_cq);
if (ret)
return ret;
}
ofi_genlock_lock(&rx_cq->util_cq.ep_list_lock);
ofi_genlock_lock(&rx_cq->efa_cq.util_cq.ep_list_lock);
rx_cq->need_to_scan_ep_list = true;
ofi_genlock_unlock(&rx_cq->util_cq.ep_list_lock);
ofi_genlock_unlock(&rx_cq->efa_cq.util_cq.ep_list_lock);
}

return FI_SUCCESS;
Expand Down
Loading

0 comments on commit 815a166

Please sign in to comment.