diff --git a/CHANGELOG.md b/CHANGELOG.md index 13c5d433b5..ac3b48e4cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added support for `crypto:one_time/4,5` on Unix and Pico as well as for `crypto:hash/2` on Pico - Added ability to configure STM32 Nucleo boards onboard UART->USB-COM using the `-DBOARD=nucleo` cmake option - Added STM32 cmake option `-DAVM_CFG_CONSOLE=` to select a different uart peripheral for the system console +- Added support for setting the default receive buffer size for sockets via `socket:setopt/3` ## [0.6.0-alpha.1] - 2023-10-09 diff --git a/doc/src/programmers-guide.md b/doc/src/programmers-guide.md index 217fb21439..60408255f2 100644 --- a/doc/src/programmers-guide.md +++ b/doc/src/programmers-guide.md @@ -1804,12 +1804,14 @@ Currently, the following options are supported: |------------|--------------|-------------| | `{socket, reuseaddr}` | `boolean()` | Sets `SO_REUSEADDR` on the socket. | | `{socket, linger}` | `#{onoff => boolean(), linger => non_neg_integer()}` | Sets `SO_LINGER` on the socekt. | +| `{otp, rcvbuf}` | `non_neg_integer()` | Sets the default buffer size (in bytes) on receive calls. This value is only used if the `Length` parameter of the `socket:recv` family of functions has the value `0`; otherwise, the specified non-zero length in the `socket:recv` takes precendence. | For example: %% erlang ok = socket:setopt(Socket, {socket, reuseaddr}, true), ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}), + ok = socket:setopt(Socket, {otp, rcvbuf}, 1024), ### UDP Socket Programming diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index 8fca3e3eef..fa2b8bf900 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -149,6 +149,7 @@ struct SocketResource uint64_t ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; + size_t buf_size; }; #elif OTP_SOCKET_LWIP struct SocketResource @@ -166,15 +167,19 @@ struct SocketResource int linger_sec; size_t pos; struct ListHead received_list; + size_t buf_size; }; #endif static const char *const addr_atom = ATOM_STR("\x4", "addr"); static const char *const any_atom = ATOM_STR("\x3", "any"); +static const char *const invalid_option_atom = ATOM_STR("\xE", "invalid_option"); +static const char *const invalid_value_atom = ATOM_STR("\xD", "invalid_value"); static const char *const linger_atom = ATOM_STR("\x6", "linger"); static const char *const loopback_atom = ATOM_STR("\x8", "loopback"); static const char *const onoff_atom = ATOM_STR("\x5", "onoff"); static const char *const port_atom = ATOM_STR("\x4", "port"); +static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf"); static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr"); #define CLOSED_FD 0 @@ -199,6 +204,19 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = { SELECT_INT_DEFAULT(OtpSocketInvalidShutdownDirection) }; +enum otp_socket_setopt_level +{ + OtpSocketInvalidSetoptLevel = 0, + OtpSocketSetoptLevelSocket, + OtpSocketSetoptLevelOTP +}; + +static const AtomStringIntPair otp_socket_setopt_level_table[] = { + { ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket }, + { ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP }, + SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel) +}; + #define DEFAULT_BUFFER_SIZE 512 #ifndef MIN @@ -529,6 +547,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) LWIP_END(); } #endif + rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); @@ -701,6 +720,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) } #endif + rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; return OK_ATOM; } @@ -722,6 +742,7 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp conn_rsrc_obj->pos = 0; conn_rsrc_obj->linger_on = false; conn_rsrc_obj->linger_sec = 0; + conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; list_init(&conn_rsrc_obj->received_list); tcp_arg(newpcb, conn_rsrc_obj); @@ -1016,62 +1037,100 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) term level_tuple = argv[1]; term value = argv[2]; - term opt = term_get_tuple_element(level_tuple, 1); - if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { - int option_value = (value == TRUE_ATOM); + term level = term_get_tuple_element(level_tuple, 0); + int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global); + switch (level_val) { + + case OtpSocketSetoptLevelSocket: { + + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { + int option_value = (value == TRUE_ATOM); #if OTP_SOCKET_BSD - int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); - if (UNLIKELY(res != 0)) { - return make_errno_tuple(ctx); - } else { - return OK_ATOM; - } + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } #elif OTP_SOCKET_LWIP - LWIP_BEGIN(); - if (option_value) { - if (rsrc_obj->socket_state & SocketStateTCP) { - ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); - } else { - ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); - } - } else { - if (rsrc_obj->socket_state & SocketStateTCP) { - ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); - } else { - ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); - } - } - LWIP_END(); - return OK_ATOM; + LWIP_BEGIN(); + if (option_value) { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); + } else { + ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); + } + } else { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); + } else { + ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); + } + } + LWIP_END(); + return OK_ATOM; #endif - } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { - term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); - term linger = interop_kv_get_value(value, linger_atom, ctx->global); - VALIDATE_VALUE(linger, term_is_integer); + } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { + term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); + term linger = interop_kv_get_value(value, linger_atom, ctx->global); + VALIDATE_VALUE(linger, term_is_integer); #if OTP_SOCKET_BSD - struct linger sl; - sl.l_onoff = (onoff == TRUE_ATOM); - sl.l_linger = term_to_int(linger); - int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - if (UNLIKELY(res != 0)) { - return make_errno_tuple(ctx); - } else { - return OK_ATOM; - } + struct linger sl; + sl.l_onoff = (onoff == TRUE_ATOM); + sl.l_linger = term_to_int(linger); + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } #elif OTP_SOCKET_LWIP - rsrc_obj->linger_on = (onoff == TRUE_ATOM); - rsrc_obj->linger_sec = term_to_int(linger); - return OK_ATOM; + rsrc_obj->linger_on = (onoff == TRUE_ATOM); + rsrc_obj->linger_sec = term_to_int(linger); + return OK_ATOM; #endif - // TODO add more as needed - // int flag = 1; - // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); - // if (UNLIKELY(res != 0)) { - // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); - // } - } else { - RAISE_ERROR(BADARG_ATOM); + // TODO add more as needed + // int flag = 1; + // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + // if (UNLIKELY(res != 0)) { + // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); + // } + } else { + RAISE_ERROR(BADARG_ATOM); + } + + case OtpSocketSetoptLevelOTP: { + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, rcvbuf_atom)) { + // socket:setopt(Socket, {otp, rcvbuf}, BufSize :: non_neg_integer()) + + if (UNLIKELY(!term_is_integer(value))) { + AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer"); + return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); + } + + avm_int_t buf_size = term_to_int(value); + if (UNLIKELY(buf_size < 0)) { + AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative"); + return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); + } + + rsrc_obj->buf_size = (size_t) buf_size; + + return OK_ATOM; + } else { + AVM_LOGE(TAG, "socket:setopt: Unsupported otp option"); + return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx); + } + } + + default: { + AVM_LOGE(TAG, "socket:setopt: Unsupported level"); + RAISE_ERROR(BADARG_ATOM); + } + } } } @@ -1442,6 +1501,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); conn_rsrc_obj->fd = fd; conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); @@ -1652,8 +1712,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ GlobalContext *global = ctx->global; int flags = MSG_WAITALL; - // TODO parameterize buffer size - ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); + ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL); TRACE("%li bytes available.\n", (long int) res); if (res < 0) { AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); @@ -1662,7 +1721,9 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ TRACE("Peer closed socket %i.\n", rsrc_obj->fd); return make_error_tuple(CLOSED_ATOM, ctx); } else { - ssize_t buffer_size = len == 0 ? (ssize_t) res : MIN((size_t) res, len); + // user-supplied len has higher precedence than the default buffer size, but we also + // want the configured default buffer size to be a lower bound on anything we peek + ssize_t buffer_size = MIN(len == 0 ? (ssize_t) rsrc_obj->buf_size : (ssize_t) len, res); // {ok, Data :: binary()} // {ok, {Source :: #{addr => Address :: {0..255, 0..255, 0..255, 0..255}, port => Port :: non_neg_integer()}, Data :: binary()}} @@ -1703,9 +1764,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs GlobalContext *global = ctx->global; - // TODO plumb through buffer size - size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len; - uint8_t *buffer = malloc(buffer_size); + size_t buffer_size = len == 0 ? rsrc_obj->buf_size : len; + uint8_t *buffer = (uint8_t *) malloc(buffer_size); term payload = term_invalid_term(); if (IS_NULL_PTR(buffer)) { diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index 474ee8339b..f49c5c1e43 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -26,6 +26,8 @@ test() -> ok = test_echo_server(), ok = test_shutdown(), ok = test_close_by_another_process(), + ok = test_buf_size(), + ok = test_override_buf_size(), case get_otp_version() of atomvm -> ok = test_abandon_select(); @@ -56,13 +58,15 @@ test_shutdown() -> ok = test_shutdown_of_client_sockets(Port), - ok = close_listen_socket(ListenSocket). + ok = close_listen_socket(ListenSocket), + + id(ok). test_shutdown_of_client_sockets(Port) -> ok = test_shutdown_of_side(Port, write), ok = test_shutdown_of_side(Port, read_write), ok = test_shutdown_of_side(Port, read), - ok. + id(ok). test_shutdown_of_side(Port, Side) -> {ok, Socket} = socket:open(inet, stream, tcp), @@ -95,7 +99,107 @@ test_shutdown_of_side(Port, Side) -> end, ok = close_client_socket(Socket), - ok. + + id(ok). + +test_close_by_another_process() -> + % socket:recv is blocking and the only way to interrupt it is to close + % the socket. + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, ClientSocket1} = socket:open(inet, stream, tcp), + ok = try_connect(ClientSocket1, Port, 10), + + spawn_link(fun() -> + timer:sleep(500), + ok = socket:close(ClientSocket1) + end), + % recv is blocking + {error, closed} = socket:recv(ClientSocket1, 0, 5000), + + timer:sleep(10), + + ok = close_listen_socket(ListenSocket), + + id(ok). + +check_receive(Socket, Packet, Length, Expect) -> + case socket:send(Socket, Packet) of + ok -> + ok = + case socket:recv(Socket, Length) of + {ok, Expect} -> + ok; + Error -> + io:format("Unexpected value on recv: ~p~n", [Error]), + Error + end; + {error, Reason} = Error -> + io:format("Error on send: ~p~n", [Reason]), + Error + end. + +test_buf_size() -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + %% try a few failures first + {error, _} = socket:setopt(Socket, {otp, badopt}, any_value), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), + + %% limit the recv buffer size to 10 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 10), + + Packet = "012345678901234567890123456789", + + %% we should only be able to receive + ok = check_receive(Socket, Packet, 0, <<"0123456789">>), + ok = check_receive(Socket, Packet, 0, <<"0123456789">>), + ok = check_receive(Socket, Packet, 0, <<"0123456789">>), + + timer:sleep(10), + + ok = close_client_socket(Socket), + + ok = close_listen_socket(ListenSocket), + + id(ok). + +test_override_buf_size() -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + %% limit the recv buffer size to 10 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 10), + + Packet = "012345678901234567890123456789", + + %% verify that the socket:recv length parameter takes + %% precedence over the default + ok = check_receive(Socket, Packet, 15, <<"012345678901234">>), + ok = check_receive(Socket, Packet, 15, <<"567890123456789">>), + + timer:sleep(10), + + ok = close_client_socket(Socket), + + ok = close_listen_socket(ListenSocket), + + id(ok). %% %% echo_server @@ -171,10 +275,18 @@ close_listen_socket(ListenSocket) -> %% ok = socket:close(ListenSocket), receive - accept_terminated -> ok + accept_terminated -> + ok after 1000 -> - %% TODO failing to receive accept_terminated message - erlang:display({timeout, waiting, accept_terminated}) + %% + %% Closing the listening socket from another process may in some + %% cases not result in the blocking accept to break out of its + %% call with an expected return value. In this case, we will + %% allow the wait for the `accept_terminated` message to fail + %% and simply warn the user. See TODO comment to this effect in + %% `nif_socket_close` function in `otp_socket.c` + %% + erlang:display({warning, timeout, waiting, accept_terminated}) % throw({timeout, waiting, accept_terminated}) end, @@ -273,26 +385,6 @@ test_abandon_select() -> erlang:garbage_collect(), ok. -test_close_by_another_process() -> - % socket:recv is blocking and the only way to interrupt it is to close - % the socket. - etest:flush_msg_queue(), - - Port = 44404, - ListenSocket = start_echo_server(Port), - - {ok, ClientSocket1} = socket:open(inet, stream, tcp), - ok = try_connect(ClientSocket1, Port, 10), - - spawn_link(fun() -> - timer:sleep(500), - ok = socket:close(ClientSocket1) - end), - % recv is blocking - {error, closed} = socket:recv(ClientSocket1, 0, 5000), - - close_listen_socket(ListenSocket). - get_otp_version() -> case erlang:system_info(machine) of "BEAM" -> @@ -300,3 +392,6 @@ get_otp_version() -> _ -> atomvm end. + +id(X) -> + X.