Skip to content

Commit

Permalink
shared_ctx: Exchange addresses 1 at a time
Browse files Browse the repository at this point in the history
When the address size x the number of endpoints exceeds the
receive buffer size, we can drop address data.  To avoid this,
exchange only 1 address at a time.  This works provided that
each address is at least the size of the control message
(64 bytes).

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
  • Loading branch information
shefty committed Jul 10, 2018
1 parent 67c5f06 commit cede60e
Showing 1 changed file with 43 additions and 79 deletions.
122 changes: 43 additions & 79 deletions functional/shared_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ static int rx_shared_ctx = 1;
static int ep_cnt = 4;
static struct fid_ep **ep_array, *srx_ctx;
static struct fid_stx *stx_ctx;
static char *local_addr, *remote_addr;
static size_t addrlen = 0;
static fi_addr_t *addr_array;

Expand Down Expand Up @@ -255,106 +254,71 @@ static int init_av(void)
int ret;
int i;

/* Get local address blob. Find the addrlen first. We set addrlen
* as 0 and fi_getname will return the actual addrlen. */
addrlen = 0;
ret = fi_getname(&ep_array[0]->fid, local_addr, &addrlen);
if (ret != -FI_ETOOSMALL) {
FT_PRINTERR("fi_getname", ret);
return ret;
if (opts.dst_addr) {
ret = ft_av_insert(av, fi->dest_addr, 1, &addr_array[0], 0, NULL);
if (ret)
return ret;
}

local_addr = malloc(addrlen * ep_cnt);
remote_addr = malloc(addrlen * ep_cnt);

/* Get local addresses for all EPs */
for (i = 0; i < ep_cnt; i++) {
ret = fi_getname(&ep_array[i]->fid, local_addr + addrlen * i, &addrlen);
addrlen = tx_size;
ret = fi_getname(&ep_array[i]->fid, tx_buf + ft_tx_prefix_size(),
&addrlen);
if (ret) {
FT_PRINTERR("fi_getname", ret);
return ret;
}
}

if (opts.dst_addr) {
memcpy(remote_addr, fi->dest_addr, addrlen);
if (opts.dst_addr) {
ret = ft_tx(ep_array[0], addr_array[0], addrlen, &tx_ctx);
if (ret)
return ret;

ret = ft_av_insert(av, remote_addr, 1, &addr_array[0], 0, NULL);
if (ret)
return ret;
if (rx_shared_ctx)
ret = ft_rx(srx_ctx, rx_size);
else
ret = ft_rx(ep_array[0], rx_size);
if (ret)
return ret;

/* Send local EP addresses to one of the remote endpoints */
memcpy(tx_buf + ft_tx_prefix_size(), &addrlen, sizeof(size_t));
memcpy(tx_buf + ft_tx_prefix_size() + sizeof(size_t),
local_addr, addrlen * ep_cnt);
ret = ft_tx(ep_array[0], addr_array[0],
sizeof(size_t) + addrlen * ep_cnt, &tx_ctx);
if (ret)
return ret;
/* Skip the first address since we already have it in AV */
if (i) {
ret = ft_av_insert(av, rx_buf + ft_rx_prefix_size(), 1,
&addr_array[i], 0, NULL);
if (ret)
return ret;
}
} else {
if (rx_shared_ctx)
ret = ft_rx(srx_ctx, rx_size);
else
ret = ft_rx(ep_array[0], rx_size);
if (ret)
return ret;

/* Get remote EP addresses */
if (rx_shared_ctx)
ret = ft_rx(srx_ctx, rx_size);
else
ret = ft_rx(ep_array[0], rx_size);
if (ret)
return ret;
ret = ft_av_insert(av, rx_buf + ft_rx_prefix_size(), 1,
&addr_array[i], 0, NULL);
if (ret)
return ret;

memcpy(&addrlen, rx_buf + ft_rx_prefix_size(), sizeof(size_t));
memcpy(remote_addr, rx_buf + ft_rx_prefix_size() + sizeof(size_t),
addrlen * ep_cnt);
ret = ft_tx(ep_array[0], addr_array[0], addrlen, &tx_ctx);
if (ret)
return ret;

/* Insert remote addresses into AV
* Skip the first address since we already have it in AV */
ret = ft_av_insert(av, remote_addr + addrlen, ep_cnt - 1,
addr_array + 1, 0, NULL);
if (ret)
return ret;
}
}

/* Send ACK */
/* ACK */
if (opts.dst_addr) {
ret = ft_tx(ep_array[0], addr_array[0], 1, &tx_ctx);
if (ret)
return ret;

} else {
/* Get remote EP addresses */
if (rx_shared_ctx)
ret = ft_rx(srx_ctx, rx_size);
else
ret = ft_rx(ep_array[0], rx_size);
if (ret)
return ret;

memcpy(&addrlen, rx_buf + ft_rx_prefix_size(), sizeof(size_t));
memcpy(remote_addr, rx_buf + ft_rx_prefix_size() + sizeof(size_t),
addrlen * ep_cnt);

/* Insert remote addresses into AV */
ret = ft_av_insert(av, remote_addr, ep_cnt, addr_array, 0, NULL);
if (ret)
return ret;

/* Send local EP addresses to one of the remote endpoints */
memcpy(tx_buf + ft_tx_prefix_size(), &addrlen, sizeof(size_t));
memcpy(tx_buf + ft_tx_prefix_size() + sizeof(size_t),
local_addr, addrlen * ep_cnt);
ret = ft_tx(ep_array[0], addr_array[0],
sizeof(size_t) + addrlen * ep_cnt, &tx_ctx);
if (ret)
return ret;

/* Receive ACK from client */
if (rx_shared_ctx)
ret = ft_rx(srx_ctx, rx_size);
else
ret = ft_rx(ep_array[0], rx_size);
if (ret)
return ret;
}

free(local_addr);
free(remote_addr);
return 0;
return ret;
}

static int init_fabric(void)
Expand Down

0 comments on commit cede60e

Please sign in to comment.