Skip to content

Commit

Permalink
Add support for setting recvbuf buffer size on a socket.
Browse files Browse the repository at this point in the history
Signed-off-by: Fred Dushin <fred@dushin.net>
  • Loading branch information
fadushin committed Oct 31, 2023
1 parent b44cc3b commit a2e1493
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New gpio driver for STM32 with nif and port support for read and write functions.
- Added support for interrupts to STM32 GPIO port driver.
- Added suppoprt for PicoW extra gpio pins (led) to the gpio driver.
- Added support for setting the receive buffer size for sockets

## [0.6.0-alpha.1] - 2023-10-09

Expand Down
2 changes: 2 additions & 0 deletions doc/src/programmers-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1804,12 +1804,14 @@ Currently, the following options are supported:
|------------|--------------|-------------|
| `{socket, reuseaddr}` | `boolean()` | Sets `SO_REUSEADDR` on the socket. |
| `{socket, linger}` | `#{onoff => boolean(), linger => non_neg_integer()}` | Sets `SO_LINGER` on the socekt. |
| `{otp, recvbuf}` | `non_neg_integer()` | Sets the buffer size (in bytes) on receive calls (Default value: 512). |

For example:

%% erlang
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
ok = socket:setopt(Socket, {otp, recvbuf}, 1024),

### UDP Socket Programming

Expand Down
164 changes: 109 additions & 55 deletions src/libAtomVM/otp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ struct SocketResource
uint64_t ref_ticks;
int32_t selecting_process_id;
ErlNifMonitor selecting_process_monitor;
size_t buf_size;
};
#elif OTP_SOCKET_LWIP
struct SocketResource
Expand Down Expand Up @@ -172,14 +173,14 @@ static const char *const linger_atom = ATOM_STR("\x6", "linger");
static const char *const loopback_atom = ATOM_STR("\x8", "loopback");
static const char *const onoff_atom = ATOM_STR("\x5", "onoff");
static const char *const port_atom = ATOM_STR("\x4", "port");
static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf");
static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr");

#define CLOSED_FD 0

#define ADDR_ATOM globalcontext_make_atom(global, addr_atom)
#define CLOSE_INTERNAL_ATOM globalcontext_make_atom(global, close_internal_atom)
#define ACCEPT_ATOM globalcontext_make_atom(global, accept_atom)
#define RECV_ATOM globalcontext_make_atom(global, recv_atom)
#define INVALID_VALUE_ATOM globalcontext_make_atom(global, ATOM_STR("\xD", "invalid_value"))
#define INVALID_OPTION_ATOM globalcontext_make_atom(global, ATOM_STR("\xE", "invalid_option"))

enum otp_socket_domain
{
Expand Down Expand Up @@ -235,6 +236,19 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = {
SELECT_INT_DEFAULT(OtpSocketInvalidShutdownDirection)
};

enum otp_socket_setopt_level
{
OtpSocketInvalidSetoptLevel = 0,
OtpSocketSetoptLevelSocket,
OtpSocketSetoptLevelOTP
};

static const AtomStringIntPair otp_socket_setopt_level_table[] = {
{ ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket },
{ ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP },
SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel)
};

#define DEFAULT_BUFFER_SIZE 512

#ifndef MIN
Expand Down Expand Up @@ -571,6 +585,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
LWIP_END();
}
#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;

if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
Expand Down Expand Up @@ -738,6 +753,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
}

#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;

return OK_ATOM;
}
Expand Down Expand Up @@ -1050,62 +1066,100 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
term level_tuple = argv[1];
term value = argv[2];

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
term level = term_get_tuple_element(level_tuple, 0);
int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global);
switch (level_val) {

case OtpSocketSetoptLevelSocket: {

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
#if OTP_SOCKET_BSD
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
#endif
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);

#if OTP_SOCKET_BSD
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
#endif
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
}

case OtpSocketSetoptLevelOTP: {
term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, rcvbuf_atom)) {
// socket:setopt(Socket, {otp, recvbuf}, BufSize :: non_neg_integer())

if (UNLIKELY(!term_is_integer(value))) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer");
return make_error_tuple(INVALID_VALUE_ATOM, ctx);
}

avm_int_t buf_size = term_to_int(value);
if (UNLIKELY(buf_size < 0)) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative");
return make_error_tuple(INVALID_VALUE_ATOM, ctx);
}

rsrc_obj->buf_size = (size_t) buf_size;

return OK_ATOM;
} else {
AVM_LOGE(TAG, "socket:setopt: Unsupported otp option");
return make_error_tuple(INVALID_OPTION_ATOM, ctx);
}
}

default: {
AVM_LOGE(TAG, "socket:setopt: Unsupported level");
RAISE_ERROR(BADARG_ATOM);
}
}
}
}

Expand Down Expand Up @@ -1484,6 +1538,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource));
conn_rsrc_obj->fd = fd;
conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd);

term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj);
Expand Down Expand Up @@ -1551,8 +1606,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
GlobalContext *global = ctx->global;

int flags = MSG_WAITALL;
// TODO parameterize buffer size
ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL);
ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL);
TRACE("%li bytes available.\n", (long int) res);
if (res < 0) {
AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno);
Expand Down Expand Up @@ -1609,7 +1663,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs
GlobalContext *global = ctx->global;

// TODO plumb through buffer size
size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len;
size_t buffer_size = len == 0 ? rsrc_obj->buf_size : len;
char *buffer = malloc(buffer_size);
term payload = term_invalid_term();

Expand Down
39 changes: 39 additions & 0 deletions tests/libs/estdlib/test_tcp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ test() ->
ok = test_echo_server(),
ok = test_shutdown(),
ok = test_close_by_another_process(),
ok = test_buf_size(),
case get_otp_version() of
atomvm ->
ok = test_abandon_select();
Expand Down Expand Up @@ -97,6 +98,44 @@ test_shutdown_of_side(Port, Side) ->
ok = close_client_socket(Socket),
ok.

test_buf_size() ->
etest:flush_msg_queue(),

Port = 44404,
ListenSocket = start_echo_server(Port),

{ok, Socket} = socket:open(inet, stream, tcp),
ok = try_connect(Socket, Port, 10),

%% try a few failures first
{error, _} = socket:setopt(Socket, {otp, badopt}, any_value),
{error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int),
{error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1),

%% limit the recv buffer size to 10 bytes
ok = socket:setopt(Socket, {otp, rcvbuf}, 10),

Packet = "012345678901234567890123456789",

case socket:send(Socket, Packet) of
ok ->
ok =
case socket:recv(Socket) of
{ok, <<"0123456789">>} ->
ok;
Error ->
io:format("Unexpected value on recv: ~p~n", [Error]),
Error
end;
{error, Reason} = Error ->
io:format("Error on send: ~p~n", [Reason]),
Error
end,

ok = close_client_socket(Socket),

close_listen_socket(ListenSocket).

%%
%% echo_server
%%
Expand Down

0 comments on commit a2e1493

Please sign in to comment.