From a2e1493d711bc0bc4cb06efbfcea54037e469d39 Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Sat, 28 Oct 2023 21:48:54 -0400 Subject: [PATCH] Add support for setting recvbuf buffer size on a socket. Signed-off-by: Fred Dushin --- CHANGELOG.md | 1 + doc/src/programmers-guide.md | 2 + src/libAtomVM/otp_socket.c | 164 ++++++++++++++++--------- tests/libs/estdlib/test_tcp_socket.erl | 39 ++++++ 4 files changed, 151 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f94e5eec20..53a6dfbd3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New gpio driver for STM32 with nif and port support for read and write functions. - Added support for interrupts to STM32 GPIO port driver. - Added suppoprt for PicoW extra gpio pins (led) to the gpio driver. +- Added support for setting the receive buffer size for sockets ## [0.6.0-alpha.1] - 2023-10-09 diff --git a/doc/src/programmers-guide.md b/doc/src/programmers-guide.md index 4f075bd05d..24caa1f2f8 100644 --- a/doc/src/programmers-guide.md +++ b/doc/src/programmers-guide.md @@ -1804,12 +1804,14 @@ Currently, the following options are supported: |------------|--------------|-------------| | `{socket, reuseaddr}` | `boolean()` | Sets `SO_REUSEADDR` on the socket. | | `{socket, linger}` | `#{onoff => boolean(), linger => non_neg_integer()}` | Sets `SO_LINGER` on the socekt. | +| `{otp, recvbuf}` | `non_neg_integer()` | Sets the buffer size (in bytes) on receive calls (Default value: 512). | For example: %% erlang ok = socket:setopt(Socket, {socket, reuseaddr}, true), ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}), + ok = socket:setopt(Socket, {otp, recvbuf}, 1024), ### UDP Socket Programming diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index cdc331561e..82605dc4e7 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -141,6 +141,7 @@ struct SocketResource uint64_t ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; + size_t buf_size; }; #elif OTP_SOCKET_LWIP struct SocketResource @@ -172,14 +173,14 @@ static const char *const linger_atom = ATOM_STR("\x6", "linger"); static const char *const loopback_atom = ATOM_STR("\x8", "loopback"); static const char *const onoff_atom = ATOM_STR("\x5", "onoff"); static const char *const port_atom = ATOM_STR("\x4", "port"); +static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf"); static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr"); #define CLOSED_FD 0 #define ADDR_ATOM globalcontext_make_atom(global, addr_atom) -#define CLOSE_INTERNAL_ATOM globalcontext_make_atom(global, close_internal_atom) -#define ACCEPT_ATOM globalcontext_make_atom(global, accept_atom) -#define RECV_ATOM globalcontext_make_atom(global, recv_atom) +#define INVALID_VALUE_ATOM globalcontext_make_atom(global, ATOM_STR("\xD", "invalid_value")) +#define INVALID_OPTION_ATOM globalcontext_make_atom(global, ATOM_STR("\xE", "invalid_option")) enum otp_socket_domain { @@ -235,6 +236,19 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = { SELECT_INT_DEFAULT(OtpSocketInvalidShutdownDirection) }; +enum otp_socket_setopt_level +{ + OtpSocketInvalidSetoptLevel = 0, + OtpSocketSetoptLevelSocket, + OtpSocketSetoptLevelOTP +}; + +static const AtomStringIntPair otp_socket_setopt_level_table[] = { + { ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket }, + { ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP }, + SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel) +}; + #define DEFAULT_BUFFER_SIZE 512 #ifndef MIN @@ -571,6 +585,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) LWIP_END(); } #endif + rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); @@ -738,6 +753,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) } #endif + rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; return OK_ATOM; } @@ -1050,62 +1066,100 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) term level_tuple = argv[1]; term value = argv[2]; - term opt = term_get_tuple_element(level_tuple, 1); - if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { - int option_value = (value == TRUE_ATOM); + term level = term_get_tuple_element(level_tuple, 0); + int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global); + switch (level_val) { + + case OtpSocketSetoptLevelSocket: { + + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { + int option_value = (value == TRUE_ATOM); #if OTP_SOCKET_BSD - int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); - if (UNLIKELY(res != 0)) { - return make_errno_tuple(ctx); - } else { - return OK_ATOM; - } + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } #elif OTP_SOCKET_LWIP - LWIP_BEGIN(); - if (option_value) { - if (rsrc_obj->socket_state & SocketStateTCP) { - ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); - } else { - ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); - } - } else { - if (rsrc_obj->socket_state & SocketStateTCP) { - ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); - } else { - ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); - } - } - LWIP_END(); - return OK_ATOM; + LWIP_BEGIN(); + if (option_value) { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); + } else { + ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); + } + } else { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); + } else { + ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); + } + } + LWIP_END(); + return OK_ATOM; #endif - } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { - term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); - term linger = interop_kv_get_value(value, linger_atom, ctx->global); - VALIDATE_VALUE(linger, term_is_integer); + } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { + term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); + term linger = interop_kv_get_value(value, linger_atom, ctx->global); + VALIDATE_VALUE(linger, term_is_integer); #if OTP_SOCKET_BSD - struct linger sl; - sl.l_onoff = (onoff == TRUE_ATOM); - sl.l_linger = term_to_int(linger); - int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - if (UNLIKELY(res != 0)) { - return make_errno_tuple(ctx); - } else { - return OK_ATOM; - } + struct linger sl; + sl.l_onoff = (onoff == TRUE_ATOM); + sl.l_linger = term_to_int(linger); + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } #elif OTP_SOCKET_LWIP - rsrc_obj->linger_on = (onoff == TRUE_ATOM); - rsrc_obj->linger_sec = term_to_int(linger); - return OK_ATOM; + rsrc_obj->linger_on = (onoff == TRUE_ATOM); + rsrc_obj->linger_sec = term_to_int(linger); + return OK_ATOM; #endif - // TODO add more as needed - // int flag = 1; - // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); - // if (UNLIKELY(res != 0)) { - // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); - // } - } else { - RAISE_ERROR(BADARG_ATOM); + // TODO add more as needed + // int flag = 1; + // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + // if (UNLIKELY(res != 0)) { + // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); + // } + } else { + RAISE_ERROR(BADARG_ATOM); + } + + case OtpSocketSetoptLevelOTP: { + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, rcvbuf_atom)) { + // socket:setopt(Socket, {otp, recvbuf}, BufSize :: non_neg_integer()) + + if (UNLIKELY(!term_is_integer(value))) { + AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer"); + return make_error_tuple(INVALID_VALUE_ATOM, ctx); + } + + avm_int_t buf_size = term_to_int(value); + if (UNLIKELY(buf_size < 0)) { + AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative"); + return make_error_tuple(INVALID_VALUE_ATOM, ctx); + } + + rsrc_obj->buf_size = (size_t) buf_size; + + return OK_ATOM; + } else { + AVM_LOGE(TAG, "socket:setopt: Unsupported otp option"); + return make_error_tuple(INVALID_OPTION_ATOM, ctx); + } + } + + default: { + AVM_LOGE(TAG, "socket:setopt: Unsupported level"); + RAISE_ERROR(BADARG_ATOM); + } + } } } @@ -1484,6 +1538,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); conn_rsrc_obj->fd = fd; conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); @@ -1551,8 +1606,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ GlobalContext *global = ctx->global; int flags = MSG_WAITALL; - // TODO parameterize buffer size - ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); + ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL); TRACE("%li bytes available.\n", (long int) res); if (res < 0) { AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); @@ -1609,7 +1663,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs GlobalContext *global = ctx->global; // TODO plumb through buffer size - size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len; + size_t buffer_size = len == 0 ? rsrc_obj->buf_size : len; char *buffer = malloc(buffer_size); term payload = term_invalid_term(); diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index 5f8d540a79..2cf26b3cdd 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -26,6 +26,7 @@ test() -> ok = test_echo_server(), ok = test_shutdown(), ok = test_close_by_another_process(), + ok = test_buf_size(), case get_otp_version() of atomvm -> ok = test_abandon_select(); @@ -97,6 +98,44 @@ test_shutdown_of_side(Port, Side) -> ok = close_client_socket(Socket), ok. +test_buf_size() -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + %% try a few failures first + {error, _} = socket:setopt(Socket, {otp, badopt}, any_value), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), + + %% limit the recv buffer size to 10 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 10), + + Packet = "012345678901234567890123456789", + + case socket:send(Socket, Packet) of + ok -> + ok = + case socket:recv(Socket) of + {ok, <<"0123456789">>} -> + ok; + Error -> + io:format("Unexpected value on recv: ~p~n", [Error]), + Error + end; + {error, Reason} = Error -> + io:format("Error on send: ~p~n", [Reason]), + Error + end, + + ok = close_client_socket(Socket), + + close_listen_socket(ListenSocket). + %% %% echo_server %%