From 3be5b785d503fc7079044fd7520ea8fef5b13f49 Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Sat, 4 Nov 2023 08:52:34 -0400 Subject: [PATCH] Opt-in purerlang implementation of gen_tcp and gen_udp using socket API Signed-off-by: Fred Dushin --- libs/estdlib/src/CMakeLists.txt | 4 + libs/estdlib/src/gen_tcp.erl | 169 ++---- libs/estdlib/src/gen_tcp_inet.erl | 220 +++++++ libs/estdlib/src/gen_tcp_socket.erl | 548 ++++++++++++++++++ libs/estdlib/src/gen_udp.erl | 92 +-- libs/estdlib/src/gen_udp_inet.erl | 161 +++++ libs/estdlib/src/gen_udp_socket.erl | 384 ++++++++++++ libs/estdlib/src/inet-priv.hrl | 22 + libs/estdlib/src/inet.erl | 40 +- .../components/avm_builtins/socket_driver.c | 50 +- .../avm_sys/include/otp_socket_platform.h | 3 + .../main/test_erl_sources/test_socket.erl | 39 +- .../generic_unix/lib/otp_socket_platform.h | 3 + .../generic_unix/lib/socket_driver.c | 60 +- .../rp2040/src/lib/otp_socket_platform.h | 3 + tests/libs/estdlib/test_gen_tcp.erl | 197 +++++-- tests/libs/estdlib/test_gen_udp.erl | 76 ++- 17 files changed, 1784 insertions(+), 287 deletions(-) create mode 100644 libs/estdlib/src/gen_tcp_inet.erl create mode 100644 libs/estdlib/src/gen_tcp_socket.erl create mode 100644 libs/estdlib/src/gen_udp_inet.erl create mode 100644 libs/estdlib/src/gen_udp_socket.erl create mode 100644 libs/estdlib/src/inet-priv.hrl diff --git a/libs/estdlib/src/CMakeLists.txt b/libs/estdlib/src/CMakeLists.txt index 68a4141ae..135fb256a 100644 --- a/libs/estdlib/src/CMakeLists.txt +++ b/libs/estdlib/src/CMakeLists.txt @@ -33,7 +33,11 @@ set(ERLANG_MODULES gen_server gen_statem gen_udp + gen_udp_inet + gen_udp_socket gen_tcp + gen_tcp_inet + gen_tcp_socket supervisor inet io_lib diff --git a/libs/estdlib/src/gen_tcp.erl b/libs/estdlib/src/gen_tcp.erl index ca53230ab..8d75df972 100644 --- a/libs/estdlib/src/gen_tcp.erl +++ b/libs/estdlib/src/gen_tcp.erl @@ -52,13 +52,14 @@ | {timeout, timeout()} | list | binary - | {binary, boolean()}. + | {binary, boolean()} + | {inet_backend, inet | socket}. -type listen_option() :: option(). -type connect_option() :: option(). -type packet() :: string() | binary(). --define(DEFAULT_PARAMS, [{active, true}, {buffer, 512}, {timeout, infinity}]). +-include("inet-priv.hrl"). %%----------------------------------------------------------------------------- %% @param Address the address to which to connect @@ -91,10 +92,14 @@ Options :: [connect_option()] ) -> {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. -connect(Address, Port, Params0) -> - Socket = open_port({spawn, "socket"}, []), - Params = merge(Params0, ?DEFAULT_PARAMS), - connect(Socket, normalize_address(Address), Port, Params). +connect(Address, Port, Options) -> + Module = get_inet_backend_module(Options), + case Module:connect(Address, Port, Options) of + {ok, Socket} -> + {ok, {?GEN_TCP_MONIKER, Socket, Module}}; + Other -> + Other + end. %%----------------------------------------------------------------------------- %% @param Socket The Socket obtained via connect/3 @@ -107,13 +112,8 @@ connect(Address, Port, Params0) -> %% @end %%----------------------------------------------------------------------------- -spec send(Socket :: inet:socket(), Packet :: packet()) -> ok | {error, Reason :: reason()}. -send(Socket, Packet) -> - case call(Socket, {send, Packet}) of - {ok, _Len} -> - ok; - Error -> - Error - end. +send({?GEN_TCP_MONIKER, Socket, Module}, Packet) -> + Module:send(Socket, Packet). %%----------------------------------------------------------------------------- %% @equiv recv(Socket, Length, infinity) @@ -122,8 +122,8 @@ send(Socket, Packet) -> %%----------------------------------------------------------------------------- -spec recv(Socket :: inet:socket(), Length :: non_neg_integer()) -> {ok, packet()} | {error, Reason :: reason()}. -recv(Socket, Length) -> - recv(Socket, Length, infinity). +recv({?GEN_TCP_MONIKER, Socket, Module}, Length) -> + Module:recv(Socket, Length). %%----------------------------------------------------------------------------- %% @param Socket the socket over which to receive a packet @@ -146,8 +146,8 @@ recv(Socket, Length) -> %%----------------------------------------------------------------------------- -spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: non_neg_integer()) -> {ok, packet()} | {error, Reason :: reason()}. -recv(Socket, Length, Timeout) -> - call(Socket, {recv, Length, Timeout}). +recv({?GEN_TCP_MONIKER, Socket, Module}, Length, Timeout) -> + Module:recv(Socket, Length, Timeout). %%----------------------------------------------------------------------------- %% @param Port the port number on which to listen. Specify 0 to use an OS-assigned @@ -161,24 +161,14 @@ recv(Socket, Length, Timeout) -> %% @end %%----------------------------------------------------------------------------- -spec listen(Port :: inet:port_number(), Options :: [listen_option()]) -> - {ok, ListeningSocket :: inet:socket()} | {error, Reason :: reason()}. + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. listen(Port, Options) -> - Socket = open_port({spawn, "socket"}, []), - Params = merge(Options, ?DEFAULT_PARAMS), - InitParams = [ - {proto, tcp}, - {listen, true}, - {controlling_process, self()}, - {port, Port}, - {backlog, 5} - | Params - ], - case call(Socket, {init, InitParams}) of - ok -> - {ok, Socket}; - ErrorReason -> - %% TODO close port - ErrorReason + Module = get_inet_backend_module(Options), + case Module:listen(Port, Options) of + {ok, Socket} -> + {ok, {?GEN_TCP_MONIKER, Socket, Module}}; + Other -> + Other end. %%----------------------------------------------------------------------------- @@ -187,10 +177,15 @@ listen(Port, Options) -> %% @doc Accept a connection on a listening socket. %% @end %%----------------------------------------------------------------------------- --spec accept(ListenSocket :: inet:socket()) -> +-spec accept(Socket :: inet:socket()) -> {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. -accept(ListenSocket) -> - accept(ListenSocket, infinity). +accept({?GEN_TCP_MONIKER, Socket, Module}) -> + case Module:accept(Socket, infinity) of + {ok, ConnectedSocket} -> + {ok, {?GEN_TCP_MONIKER, ConnectedSocket, Module}}; + Error -> + Error + end. %%----------------------------------------------------------------------------- %% @param ListenSocket the listening socket. @@ -199,15 +194,14 @@ accept(ListenSocket) -> %% @doc Accept a connection on a listening socket. %% @end %%----------------------------------------------------------------------------- --spec accept(ListenSocket :: inet:socket(), Timeout :: timeout()) -> +-spec accept(Socket :: inet:socket(), Timeout :: timeout()) -> {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. -accept(ListenSocket, Timeout) -> - case call(ListenSocket, {accept, Timeout}) of - {ok, Socket} when is_pid(Socket) -> - {ok, Socket}; - ErrorReason -> - %% TODO close port - ErrorReason +accept({?GEN_TCP_MONIKER, Socket, Module}, Timeout) -> + case Module:accept(Socket, Timeout) of + {ok, ConnectedSocket} -> + {ok, {?GEN_TCP_MONIKER, ConnectedSocket, Module}}; + Error -> + Error end. %%----------------------------------------------------------------------------- @@ -217,8 +211,8 @@ accept(ListenSocket, Timeout) -> %% @end %%----------------------------------------------------------------------------- -spec close(Socket :: inet:socket()) -> ok. -close(Socket) -> - inet:close(Socket). +close({?GEN_TCP_MONIKER, Socket, Module}) -> + Module:close(Socket). %%----------------------------------------------------------------------------- %% @param Socket the socket to which to assign the pid @@ -236,77 +230,20 @@ close(Socket) -> %%----------------------------------------------------------------------------- -spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> ok | {error, Reason :: reason()}. -controlling_process(Socket, Pid) -> - call(Socket, {controlling_process, Pid}). - -%% internal operations - -%% @private -connect(DriverPid, Address, Port, Params) -> - InitParams = [ - {proto, tcp}, - {connect, true}, - {controlling_process, self()}, - {address, Address}, - {port, Port} - | Params - ], - case call(DriverPid, {init, InitParams}) of - ok -> - {ok, DriverPid}; - ErrorReason -> - %% TODO close port - ErrorReason - end. - -%% TODO implement this in lists - -%% @private -merge(Config, Defaults) -> - merge(Config, Defaults, []) ++ Config. - -%% @private -merge(_Config, [], Accum) -> - Accum; -merge(Config, [H | T], Accum) -> - Key = - case H of - {K, _V} -> K; - K -> K - end, - case proplists:get_value(Key, Config) of - undefined -> - merge(Config, T, [H | Accum]); - Value -> - merge(Config, T, [{Key, Value} | Accum]) - end. - -%% @private -normalize_address(localhost) -> - "127.0.0.1"; -normalize_address(loopback) -> - "127.0.0.1"; -normalize_address(Address) when is_list(Address) -> - Address; -normalize_address({A, B, C, D}) when - is_integer(A) and is_integer(B) and is_integer(C) and is_integer(D) --> - integer_to_list(A) ++ - "." ++ - integer_to_list(B) ++ - "." ++ - integer_to_list(C) ++ - "." ++ integer_to_list(D). - -%% TODO IPv6 +controlling_process({?GEN_TCP_MONIKER, Socket, Module}, Pid) -> + Module:controlling_process(Socket, Pid). %% -%% Internal operations +%% Internal implementation %% -call(Port, Msg) -> - case port:call(Port, Msg) of - {error, noproc} -> {error, closed}; - out_of_memory -> {error, enomem}; - Result -> Result +%% @private +get_inet_backend_module(Options) -> + case proplists:get_value(inet_backend, Options) of + undefined -> + gen_tcp_inet; + inet -> + gen_tcp_inet; + socket -> + gen_tcp_socket end. diff --git a/libs/estdlib/src/gen_tcp_inet.erl b/libs/estdlib/src/gen_tcp_inet.erl new file mode 100644 index 000000000..f1e7d267d --- /dev/null +++ b/libs/estdlib/src/gen_tcp_inet.erl @@ -0,0 +1,220 @@ +% +% This file is part of AtomVM. +% +% Copyright 2019-2022 Fred Dushin +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%% @hidden +-module(gen_tcp_inet). + +-export([ + connect/3, send/2, recv/2, recv/3, close/1, listen/2, accept/1, accept/2, controlling_process/2 +]). + +%% inet API +-export([port/1, sockname/1, peername/1]). + +-type reason() :: term(). + +-type option() :: + {active, boolean()} + | {buffer, pos_integer()} + | {timeout, timeout()} + | list + | binary + | {binary, boolean()}. + +-type listen_option() :: option(). +-type connect_option() :: option(). +-type packet() :: string() | binary(). + +-define(DEFAULT_PARAMS, [{active, true}, {buffer, 512}, {timeout, infinity}]). + +%% @hidden +-spec connect( + Address :: inet:ip_address() | inet:hostname(), + Port :: inet:port_number(), + Options :: [connect_option()] +) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +connect(Address, Port, Params0) -> + Socket = open_port({spawn, "socket"}, []), + Params = merge(Params0, ?DEFAULT_PARAMS), + connect(Socket, normalize_address(Address), Port, Params). + +%% @hidden +-spec send(Socket :: inet:socket(), Packet :: packet()) -> ok | {error, Reason :: reason()}. +send(Socket, Packet) -> + case call(Socket, {send, Packet}) of + {ok, _Len} -> + ok; + Error -> + Error + end. + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer()) -> + {ok, packet()} | {error, Reason :: reason()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: non_neg_integer()) -> + {ok, packet()} | {error, Reason :: reason()}. +recv(Socket, Length, Timeout) -> + call(Socket, {recv, Length, Timeout}). + +%% @hidden +-spec listen(Port :: inet:port_number(), Options :: [listen_option()]) -> + {ok, ListeningSocket :: inet:socket()} | {error, Reason :: reason()}. +listen(Port, Options) -> + Socket = open_port({spawn, "socket"}, []), + Params = merge(Options, ?DEFAULT_PARAMS), + InitParams = [ + {proto, tcp}, + {listen, true}, + {controlling_process, self()}, + {port, Port}, + {backlog, 5} + | Params + ], + case call(Socket, {init, InitParams}) of + ok -> + {ok, Socket}; + ErrorReason -> + %% TODO close port + ErrorReason + end. + +%% @hidden +-spec accept(ListenSocket :: inet:socket()) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +accept(ListenSocket) -> + accept(ListenSocket, infinity). + +%% @hidden +-spec accept(ListenSocket :: inet:socket(), Timeout :: timeout()) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +accept(ListenSocket, Timeout) -> + case call(ListenSocket, {accept, Timeout}) of + {ok, Socket} when is_pid(Socket) -> + {ok, Socket}; + ErrorReason -> + %% TODO close port + ErrorReason + end. + +%% @hidden +-spec close(Socket :: inet:socket()) -> ok. +close(Socket) -> + call(Socket, {close}), + ok. + +%% @hidden +-spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> + ok | {error, Reason :: reason()}. +controlling_process(Socket, Pid) -> + call(Socket, {controlling_process, Pid}). + +%% +%% inet API +%% + +%% @hidden +port(Socket) -> + call(Socket, {get_port}). + +%% @hidden +sockname(Socket) -> + call(Socket, {sockname}). + +%% @hidden +peername(Socket) -> + call(Socket, {peername}). + +%% internal operations + +%% @private +connect(DriverPid, Address, Port, Params) -> + InitParams = [ + {proto, tcp}, + {connect, true}, + {controlling_process, self()}, + {address, Address}, + {port, Port} + | Params + ], + case call(DriverPid, {init, InitParams}) of + ok -> + {ok, DriverPid}; + ErrorReason -> + %% TODO close port + ErrorReason + end. + +%% TODO implement this in lists + +%% @private +merge(Config, Defaults) -> + merge(Config, Defaults, []) ++ Config. + +%% @private +merge(_Config, [], Accum) -> + Accum; +merge(Config, [H | T], Accum) -> + Key = + case H of + {K, _V} -> K; + K -> K + end, + case proplists:get_value(Key, Config) of + undefined -> + merge(Config, T, [H | Accum]); + Value -> + merge(Config, T, [{Key, Value} | Accum]) + end. + +%% @private +normalize_address(localhost) -> + "127.0.0.1"; +normalize_address(loopback) -> + "127.0.0.1"; +normalize_address(Address) when is_list(Address) -> + Address; +normalize_address({A, B, C, D}) when + is_integer(A) and is_integer(B) and is_integer(C) and is_integer(D) +-> + integer_to_list(A) ++ + "." ++ + integer_to_list(B) ++ + "." ++ + integer_to_list(C) ++ + "." ++ integer_to_list(D). + +%% TODO IPv6 + +%% +%% Internal operations +%% + +%% @private +call(Port, Msg) -> + case port:call(Port, Msg) of + {error, noproc} -> {error, closed}; + out_of_memory -> {error, enomem}; + Result -> Result + end. diff --git a/libs/estdlib/src/gen_tcp_socket.erl b/libs/estdlib/src/gen_tcp_socket.erl new file mode 100644 index 000000000..6d808c417 --- /dev/null +++ b/libs/estdlib/src/gen_tcp_socket.erl @@ -0,0 +1,548 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Fred Dushin +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%% @hidden +-module(gen_tcp_socket). + +-export([ + connect/3, + send/2, + recv/2, + recv/3, + close/1, + listen/2, + accept/1, + accept/2, + controlling_process/2 +]). + +-include("inet-priv.hrl"). +-include_lib("kernel/include/logger.hrl"). + +%% inet API +-export([port/1, sockname/1, peername/1]). + +%% gen_server implementation (hidden) +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-type reason() :: term(). + +-type option() :: + {active, boolean()} + | {buffer, pos_integer()} + | {timeout, timeout()} + | list + | binary. + +-type listen_option() :: option(). +-type connect_option() :: option(). +-type packet() :: string() | binary(). + +-define(DEFAULT_OPTIONS, #{ + active => true, + buffer => 0, + timeout => infinity +}). + +-spec connect( + Address :: inet:ip_address() | inet:hostname(), + Port :: inet:port_number(), + Options :: [connect_option()] +) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +%% @hidden +connect(Address, Port, Options) -> + ControllingProcess = self(), + case socket:open(inet, stream, tcp) of + {ok, Socket} -> + case socket:connect(Socket, #{family => inet, addr => Address, port => Port}) of + ok -> + EffectiveOptions = maps:merge(?DEFAULT_OPTIONS, proplist_to_map(Options)), + gen_server:start_link( + ?MODULE, {Socket, ControllingProcess, connect, EffectiveOptions}, [] + ); + ConnectError -> + ConnectError + end; + OpenError -> + OpenError + end. + +%% @hidden +-spec send(Socket :: inet:socket(), Packet :: packet()) -> ok | {error, Reason :: reason()}. +send(Socket, Packet) -> + call(Socket, {send, Packet}). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer()) -> + {ok, packet()} | {error, Reason :: reason()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: non_neg_integer()) -> + {ok, packet()} | {error, Reason :: reason()}. +recv(Socket, Length, Timeout) -> + call(Socket, {recv, Length, Timeout}). + +%% @hidden +-spec listen(Port :: inet:port_number(), Options :: [listen_option()]) -> + {ok, ListeningSocket :: inet:socket()} | {error, Reason :: reason()}. +listen(Port, Options) -> + ControllingProcess = self(), + case socket:open(inet, stream, tcp) of + {ok, Socket} -> + EffectiveOptions = maps:merge( + ?DEFAULT_OPTIONS, proplist_to_map(Options) + ), + Addr = maps:get(ifaddr, EffectiveOptions, any), + case socket:bind(Socket, #{family => inet, addr => Addr, port => Port}) of + ok -> + case socket:listen(Socket) of + ok -> + gen_server:start_link( + ?MODULE, {Socket, ControllingProcess, listen, EffectiveOptions}, [] + ); + ListenError -> + socket:close(Socket), + ListenError + end; + BindError -> + socket:close(Socket), + BindError + end; + OpenError -> + OpenError + end. + +%% @hidden +-spec accept(ListenSocket :: inet:socket()) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +accept(ListenSocket) -> + accept(ListenSocket, infinity). + +%% @hidden +-spec accept(ListenSocket :: inet:socket(), Timeout :: timeout()) -> + {ok, Socket :: inet:socket()} | {error, Reason :: reason()}. +accept(ListenSocket, Timeout) -> + AcceptingProc = self(), + call(ListenSocket, {accept, Timeout, AcceptingProc}). + +%% @hidden +-spec close(Socket :: inet:socket()) -> ok. +close(Socket) -> + call(Socket, close). + +%% @hidden +-spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> + ok | {error, Reason :: reason()}. +controlling_process(Socket, Pid) -> + %% WARNING: calling process is potentially spoofable + call(Socket, {controlling_process, Pid, self()}). + +%% +%% inet API +%% + +%% @hidden +port(Socket) -> + call(Socket, get_port). + +%% @hidden +sockname(Socket) -> + call(Socket, sockname). + +%% @hidden +peername(Socket) -> + call(Socket, peername). + +%% +%% gen_server implementation +%% + +-record(state, { + socket, + controlling_process = undefined, + monitor_ref = undefined, + active, + options, + pending_selects = #{} +}). + +init({Socket, ControllingProcess, Mode, Options}) -> + case (Mode =:= connect orelse Mode =:= accept) andalso maps:get(active, Options, true) of + true -> + Ref = erlang:make_ref(), + case socket:nif_select_read(Socket, Ref) of + ok -> + ?LOG_INFO( + "Starting process in ~p (active) mode. Socket=~p ControllingProcess=~p Options=~p Ref=~p", + [ + Mode, Socket, ControllingProcess, Options, Ref + ] + ), + MonitorRef = erlang:monitor(process, ControllingProcess), + {ok, #state{ + socket = Socket, + controlling_process = ControllingProcess, + monitor_ref = MonitorRef, + active = true, + options = Options, + pending_selects = #{Ref => active} + }}; + {error, _Reason} = Error -> + %% is there a better return from init? + Error + end; + _ -> + ?LOG_INFO( + "Starting process in ~p (passive) mode. Socket=~p ControllingProcess=~p Options=~p", + [ + Mode, Socket, ControllingProcess, Options + ] + ), + {ok, #state{ + socket = Socket, + active = false, + options = Options + }} + end. + +%% @hidden +handle_call({accept, Timeout, AcceptingProc}, From, State) -> + ?LOG_DEBUG("handle_call [~p], ~p, ~p]", [ + {accept, Timeout, AcceptingProc}, From, State + ]), + Ref = erlang:make_ref(), + case Timeout of + TimeoutMs when is_integer(TimeoutMs) andalso TimeoutMs >= 0 -> + erlang:send_after(TimeoutMs, self(), {timeout, Ref, From}); + infinity -> + ok + end, + case socket:nif_select_read(State#state.socket, Ref) of + ok -> + ?LOG_INFO("Starting accept select with ref ~p", [Ref]), + PendingSelects = State#state.pending_selects, + NewPendingSelects = PendingSelects#{Ref => {accept, From, AcceptingProc, Timeout}}, + {noreply, State#state{pending_selects = NewPendingSelects}}; + {error, _Reason} = Error -> + ?LOG_ERROR("An error occurred in select for accept ~p", [Error]), + {reply, Error, State} + end; +handle_call({send, Packet}, From, State) -> + ?LOG_DEBUG("handle_call [~p], ~p, ~p]", [ + {send, Packet}, From, State + ]), + ?LOG_INFO("Sending packet"), + {reply, socket:send(State#state.socket, Packet), State}; +handle_call({recv, Length, Timeout}, From, State) -> + ?LOG_DEBUG("handle_call [~p], ~p, ~p]", [ + {recv, Length, Timeout}, From, State + ]), + case State#state.active of + true -> + {reply, {error, einval}, State}; + _ -> + Ref = erlang:make_ref(), + case Timeout of + TimeoutMs when is_integer(TimeoutMs) andalso TimeoutMs >= 0 -> + ?LOG_DEBUG("setting timeout counter for TimeoutMs=~p on ref=~p", [ + TimeoutMs, Ref + ]), + erlang:send_after(TimeoutMs, self(), {timeout, Ref, From}); + infinity -> + %% TEST + erlang:send_after(30000, self(), {timeout, Ref, From}), + ok + end, + case socket:nif_select_read(State#state.socket, Ref) of + ok -> + PendingSelects = State#state.pending_selects, + NewPendingSelects = PendingSelects#{Ref => {passive, From, Length, Timeout}}, + ?LOG_INFO("Added pending select. NewPendingSelects=~p", [NewPendingSelects]), + {noreply, State#state{pending_selects = NewPendingSelects}}; + {error, _Reason} = Error -> + ?LOG_ERROR("An error occurred calling socket:nif_select_read/2: ~p", [Error]), + {noreply, State} + end + end; +handle_call(port, _From, State) -> + Reply = + case socket:sockname(State#state.socket) of + {ok, Addr} -> + #{port := Port} = Addr, + {ok, Port}; + SocknameError -> + SocknameError + end, + {reply, Reply, State}; +handle_call(sockname, _From, State) -> + Reply = + case socket:sockname(State#state.socket) of + {ok, Addr} -> + #{port := Port, addr := Address} = Addr, + {ok, {Address, Port}}; + SocknameError -> + SocknameError + end, + {reply, Reply, State}; +handle_call(peername, _From, State) -> + Reply = + case socket:peername(State#state.socket) of + {ok, Addr} -> + #{port := Port, addr := Address} = Addr, + {ok, {Address, Port}}; + SocknameError -> + SocknameError + end, + {reply, Reply, State}; +handle_call({controlling_process, Pid, Caller}, _From, State) -> + case State#state.active of + false -> + {reply, {error, einval}, State}; + _ -> + case State#state.controlling_process =/= Caller of + true -> + {reply, {error, not_owner}, State}; + _ -> + MonitorRef = erlang:monitor(process, Pid), + true = erlang:demonitor(State#state.monitor_ref), + ?LOG_INFO("Set controlling process to ~p", [Pid]), + {reply, ok, State#state{controlling_process = Pid, monitor_ref = MonitorRef}} + end + end; +handle_call(close, _From, State) -> + {stop, normal, socket:close(State#state.socket), State}; +handle_call(Request, _From, State) -> + {reply, {unknown_request, Request}, State}. + +%% @hidden +handle_cast(_Request, State) -> + {noreply, State}. + +%% @hidden +handle_info({select, _Socket, Ref, ready_input}, State) -> + ?LOG_DEBUG("handle_info [~p], ~p]", [ + {select, _Socket, Ref, ready_input}, State + ]), + %% TODO cancel timer + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + ?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]), + %% select_stop? + {noreply, State}; + {accept, From, AcceptingProc, _Timeout} -> + ?LOG_INFO("Select ready for read on accept"), + NewState = handle_accept(State, From, AcceptingProc), + {noreply, NewState#state{ + pending_selects = maps:remove(Ref, State#state.pending_selects) + }}; + active -> + ?LOG_INFO("Select ready for read on active recv"), + NewState = handle_active_recv(State), + {noreply, NewState}; + {passive, From, Length, Timeout} -> + ?LOG_INFO("Select ready for read on passive recv"), + NewState = handle_passive_recv(State, From, Length, Timeout), + {noreply, NewState#state{ + pending_selects = maps:remove(Ref, State#state.pending_selects) + }} + end; +handle_info({timeout, Ref, From}, State) -> + ?LOG_DEBUG("handle_info [~p], ~p]", [ + {timeout, Ref, From}, State + ]), + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + %% The request was already processed. Ignore the message + {noreply, State}; + _ -> + ?LOG_INFO("Select ref ~p in pending selects has timed out.", [Ref]), + gen_server:reply(From, {error, timeout}), + {noreply, State#state{pending_selects = maps:remove(Ref, State#state.pending_selects)}} + end; +handle_info({'DOWN', MonitorRef, process, ControllingProcess, Info}, State) -> + ?LOG_DEBUG("handle_info [~p], ~p", [ + {'DOWN', MonitorRef, process, ControllingProcess, Info} + ]), + case State#state.monitor_ref =:= MonitorRef of + true -> + ?LOG_INFO("Controlling process ~p has terminated. Stopping ~p.", [ + ControllingProcess, ?MODULE + ]), + {stop, normal, State}; + _ -> + {noreply, State} + end; +handle_info(Info, State) -> + ?LOG_WARNING("Received unexpected Info msg ~p with State ~p", [Info, State]), + {noreply, State}. + +%% @hidden +terminate(Reason, State) -> + ?LOG_INFO("Closing socket ~p for reason ~p", [State#state.socket, Reason]), + catch socket:close(State#state.socket). + +%% +%% Internal operations +%% + +%% @private +proplist_to_map(PropList) -> + proplist_to_map(PropList, #{}). + +%% @private +proplist_to_map([], Accum) -> + Accum; +proplist_to_map([Atom | T], Accum) when is_atom(Atom) -> + proplist_to_map(T, Accum#{Atom => true}); +proplist_to_map([{K, V} | T], Accum) -> + proplist_to_map(T, Accum#{K => V}). + +%% @private +handle_accept(State, From, AcceptingProc) -> + Socket = State#state.socket, + ControllingProcess = AcceptingProc, + Options = State#state.options, + %% CAUTION: internal API + case socket:nif_accept(Socket) of + {ok, ConnectedSocket} -> + Reply = + gen_server:start_link( + ?MODULE, {ConnectedSocket, ControllingProcess, accept, Options}, [] + ), + ?LOG_INFO("Accepted connection. Attempted to start connected socket: ~p", [Reply]), + gen_server:reply(From, Reply), + State; + {error, _Reason} = Error -> + ?LOG_ERROR("Failed to accept connection: ~p", [Error]), + %% CAUTION: internal API + socket:nif_select_stop(Socket), + gen_server:reply(From, Error), + State + end. + +%% @private +handle_active_recv(State) -> + Socket = State#state.socket, + Length = maps:get(buffer, State#state.options, 0), + WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE}, + %% CAUTION: internal API + case socket:nif_recv(Socket, Length) of + {ok, Data} -> + ?LOG_INFO("Received data len=~p", [erlang:byte_size(Data)]), + BinaryOrList = maybe_encode_binary(State#state.options, Data), + ?LOG_INFO("Sending tcp message to controlling process ~p", [ + State#state.controlling_process + ]), + State#state.controlling_process ! {tcp, WrappedSocket, BinaryOrList}, + + %% start a new select + Ref = erlang:make_ref(), + case socket:nif_select_read(Socket, Ref) of + ok -> + ?LOG_INFO("Started read select on ref=~p", [Ref]), + PendingSelects = State#state.pending_selects, + NewPendingSelects = maps:remove(Ref, PendingSelects), + State#state{pending_selects = NewPendingSelects#{Ref => active}}; + Error -> + ?LOG_ERROR("Unable to start select on new ref=~p Error=~p", [Ref, Error]), + State + end; + {closed, OtherRef} -> + ?LOG_INFO("Socket was closed other ref=~p", [OtherRef]), + % socket was closed by another process + % TODO: we need to handle: + % (a) SELECT_STOP being scheduled + % (b) flush of messages as we can have both + % {closed, Ref} and {select, _, Ref, _} in the + % queue + State#state.controlling_process ! {tcp_closed, WrappedSocket}, + State; + {error, closed} -> + ?LOG_INFO("Socket was closed by peer"), + %% CAUTION: internal API + socket:nif_select_stop(Socket), + %% peer has closed the connection + WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE}, + State#state.controlling_process ! {tcp_closed, WrappedSocket}, + State; + {error, _} = Error -> + ?LOG_ERROR("Error on receive on pending select Error=~p", [Error]), + %% CAUTION: internal API + socket:nif_select_stop(Socket), + %% TODO is this right? I don't think active receivers would know + %% what to do with this message. Maybe log it instead? + State#state.controlling_process ! {tcp_error, WrappedSocket, Error}, + State + end. + +%% @private +handle_passive_recv(State, From, Length, _Timeout) -> + Socket = State#state.socket, + %% CAUTION: internal API + case socket:nif_recv(Socket, Length) of + {ok, Data} -> + ?LOG_INFO("Received data len=~p", [erlang:byte_size(Data)]), + BinaryOrList = maybe_encode_binary(State#state.options, Data), + gen_server:reply(From, {ok, BinaryOrList}), + State; + {closed, OtherRef} -> + ?LOG_INFO("Socket was closed other ref=~p", [OtherRef]), + % socket was closed by another process + % TODO: we need to handle: + % (a) SELECT_STOP being scheduled + % (b) flush of messages as we can have both + % {closed, Ref} and {select, _, Ref, _} in the + % queue + gen_server:reply(From, {error, closed}), + State; + {error, _} = Error -> + ?LOG_ERROR("Error on receive from socket:nif_recv Error=~p", [Error]), + socket:nif_select_stop(Socket), + ?LOG_INFO("unable to receive on pending select~n"), + gen_server:reply(From, Error), + State + end. + +%% @private +call(Pid, Request) -> + gen_server:call(Pid, Request, infinity). + +%% @private +maybe_encode_binary(Options, Data) -> + case encode_binary(Options) of + true -> + Data; + _ -> + binary_to_list(Data) + end. + +%% @private +encode_binary(Options) -> + case {maps:get(binary, Options, undefined), maps:get(list, Options, undefined)} of + {undefined, undefined} -> + true; + {true, _} -> + true; + _ -> + false + end. diff --git a/libs/estdlib/src/gen_udp.erl b/libs/estdlib/src/gen_udp.erl index 23bd518a2..870a2de4c 100644 --- a/libs/estdlib/src/gen_udp.erl +++ b/libs/estdlib/src/gen_udp.erl @@ -52,9 +52,10 @@ | {timeout, timeout()} | list | binary - | {binary, boolean()}. + | {binary, boolean()} + | {inet_backend, inet | socket}. --define(DEFAULT_PARAMS, [{active, true}, {buffer, 128}, {timeout, infinity}]). +-include("inet-priv.hrl"). %%----------------------------------------------------------------------------- %% @equiv open(PortNum, []) @@ -86,9 +87,13 @@ open(PortNum) -> -spec open(PortNum :: inet:port_number(), Options :: [option()]) -> {ok, inet:socket()} | {error, Reason :: reason()}. open(PortNum, Options) -> - DriverPid = open_port({spawn, "socket"}, []), - Params = merge(Options, ?DEFAULT_PARAMS), - init(DriverPid, PortNum, Params). + Module = get_inet_backend_module(Options), + case Module:open(PortNum, Options) of + {ok, Socket} -> + {ok, {?GEN_UDP_MONIKER, Socket, Module}}; + Other -> + Other + end. %%----------------------------------------------------------------------------- %% @param Socket the socket over which to send a packet @@ -107,13 +112,8 @@ open(PortNum, Options) -> PortNum :: inet:port_number(), Packet :: packet() ) -> ok | {error, reason()}. -send(Socket, Address, PortNum, Packet) -> - case call(Socket, {sendto, Address, PortNum, Packet}) of - {ok, _Sent} -> - ok; - Else -> - Else - end. +send({?GEN_UDP_MONIKER, Socket, Module}, Address, PortNum, Packet) -> + Module:send(Socket, Address, PortNum, Packet). %%----------------------------------------------------------------------------- %% @equiv recv(Socket, Length, infinity) @@ -144,8 +144,8 @@ recv(Socket, Length) -> %%----------------------------------------------------------------------------- -spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> {ok, {inet:ip_address(), inet:port_number(), packet()}} | {error, reason()}. -recv(Socket, Length, Timeout) -> - call(Socket, {recvfrom, Length, Timeout}). +recv({?GEN_UDP_MONIKER, Socket, Module}, Length, Timeout) -> + Module:recv(Socket, Length, Timeout). %%----------------------------------------------------------------------------- %% @param Socket the socket to close @@ -154,8 +154,8 @@ recv(Socket, Length, Timeout) -> %% @end %%----------------------------------------------------------------------------- -spec close(inet:socket()) -> ok. -close(Socket) -> - inet:close(Socket). +close({?GEN_UDP_MONIKER, Socket, Module}) -> + Module:close(Socket). %%----------------------------------------------------------------------------- %% @param Socket the socket to which to assign the pid @@ -173,56 +173,20 @@ close(Socket) -> %%----------------------------------------------------------------------------- -spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> ok | {error, Reason :: reason()}. -controlling_process(Socket, Pid) -> - call(Socket, {controlling_process, Pid}). - -%% internal operations - -%% @private -init(DriverPid, PortNum, Params) -> - InitParams = [ - {proto, udp}, - {port, PortNum}, - {controlling_process, self()} - | Params - ], - case call(DriverPid, {init, InitParams}) of - ok -> - {ok, DriverPid}; - ErrorReason -> - %% TODO close port - ErrorReason - end. - -%% TODO implement this in lists - -%% @private -merge(Config, Defaults) -> - merge(Config, Defaults, []) ++ Config. - -%% @private -merge(_Config, [], Accum) -> - Accum; -merge(Config, [H | T], Accum) -> - Key = - case H of - {K, _V} -> K; - K -> K - end, - case proplists:get_value(Key, Config) of - undefined -> - merge(Config, T, [H | Accum]); - Value -> - merge(Config, T, [{Key, Value} | Accum]) - end. +controlling_process({?GEN_UDP_MONIKER, Socket, Module}, Pid) -> + Module:controlling_process(Socket, Pid). %% -%% Internal operations +%% Internal implementation %% -call(Port, Msg) -> - case port:call(Port, Msg) of - {error, noproc} -> {error, closed}; - out_of_memory -> {error, enomem}; - Result -> Result +%% @private +get_inet_backend_module(Options) -> + case proplists:get_value(inet_backend, Options) of + undefined -> + gen_udp_inet; + inet -> + gen_udp_inet; + socket -> + gen_udp_socket end. diff --git a/libs/estdlib/src/gen_udp_inet.erl b/libs/estdlib/src/gen_udp_inet.erl new file mode 100644 index 000000000..ccfa305fb --- /dev/null +++ b/libs/estdlib/src/gen_udp_inet.erl @@ -0,0 +1,161 @@ +% +% This file is part of AtomVM. +% +% Copyright 2018-2022 Fred Dushin +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%% @hidden +-module(gen_udp_inet). + +-export([open/1, open/2, send/4, recv/2, recv/3, close/1, controlling_process/2]). + +%% inet API +-export([port/1, sockname/1]). + +-type packet() :: string() | binary(). +-type reason() :: term(). + +-type option() :: + {active, boolean()} + | {buffer, pos_integer()} + | {timeout, timeout()} + | list + | binary + | {binary, boolean()}. + +-define(DEFAULT_PARAMS, [{active, true}, {buffer, 128}, {timeout, infinity}]). + +%% @hidden +-spec open(PortNum :: inet:port_number()) -> {ok, inet:socket()} | {error, Reason :: reason()}. +open(PortNum) -> + open(PortNum, []). + +%% @hidden +-spec open(PortNum :: inet:port_number(), Options :: [option()]) -> + {ok, inet:socket()} | {error, Reason :: reason()}. +open(PortNum, Options) -> + DriverPid = open_port({spawn, "socket"}, []), + Params = merge(Options, ?DEFAULT_PARAMS), + init(DriverPid, PortNum, Params). + +%% @hidden +-spec send( + Socket :: inet:socket(), + Address :: inet:ip_address(), + PortNum :: inet:port_number(), + Packet :: packet() +) -> ok | {error, reason()}. +send(Socket, Address, PortNum, Packet) -> + case call(Socket, {sendto, Address, PortNum, Packet}) of + {ok, _Sent} -> + ok; + Else -> + Else + end. + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer()) -> + {ok, {inet:ip_address(), inet:port_number(), packet()}} | {error, reason()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> + {ok, {inet:ip_address(), inet:port_number(), packet()}} | {error, reason()}. +recv(Socket, Length, Timeout) -> + %% Note. Currently, timeout is not supported in the native (C) driver + call(Socket, {recvfrom, Length, Timeout}, Timeout). + +%% @hidden +-spec close(inet:socket()) -> ok. +close(Socket) -> + call(Socket, {close}), + ok. + +%% @hidden +-spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> + ok | {error, Reason :: reason()}. +controlling_process(Socket, Pid) -> + call(Socket, {controlling_process, Pid}). + +%% +%% inet API +%% + +%% @hidden +port(Socket) -> + call(Socket, {get_port}). + +%% @hidden +sockname(Socket) -> + call(Socket, {sockname}). + +%% internal operations + +%% @private +init(DriverPid, PortNum, Params) -> + InitParams = [ + {proto, udp}, + {port, PortNum}, + {controlling_process, self()} + | Params + ], + case call(DriverPid, {init, InitParams}) of + ok -> + {ok, DriverPid}; + ErrorReason -> + %% TODO close port + ErrorReason + end. + +%% TODO implement this in lists + +%% @private +merge(Config, Defaults) -> + merge(Config, Defaults, []) ++ Config. + +%% @private +merge(_Config, [], Accum) -> + Accum; +merge(Config, [H | T], Accum) -> + Key = + case H of + {K, _V} -> K; + K -> K + end, + case proplists:get_value(Key, Config) of + undefined -> + merge(Config, T, [H | Accum]); + Value -> + merge(Config, T, [{Key, Value} | Accum]) + end. + +%% +%% Internal operations +%% + +%% @private +call(Port, Msg) -> + call(Port, Msg, infinity). + +%% @private +call(Port, Msg, Timeout) -> + case port:call(Port, Msg, Timeout) of + {error, noproc} -> {error, closed}; + out_of_memory -> {error, enomem}; + Result -> Result + end. diff --git a/libs/estdlib/src/gen_udp_socket.erl b/libs/estdlib/src/gen_udp_socket.erl new file mode 100644 index 000000000..56aa27fe5 --- /dev/null +++ b/libs/estdlib/src/gen_udp_socket.erl @@ -0,0 +1,384 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Fred Dushin +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%% @hidden +-module(gen_udp_socket). + +-export([ + open/1, open/2, + send/4, + recv/2, recv/3, + close/1, + controlling_process/2 +]). + +-include("inet-priv.hrl"). +-include_lib("kernel/include/logger.hrl"). + +%% inet API +-export([port/1, sockname/1]). + +%% gen_server implementation (hidden) +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-type packet() :: string() | binary(). +-type reason() :: term(). + +-type option() :: + {active, boolean()} + | {buffer, pos_integer()} + | {timeout, timeout()} + | list + | binary + | {binary, boolean()} + | {ifaddr, inet:ip_address()}. + +-define(DEFAULT_OPTIONS, #{ + active => true, + buffer => 0, + timeout => infinity +}). + +%% @hidden +-spec open(Port :: inet:port_number()) -> {ok, inet:socket()} | {error, Reason :: reason()}. +open(Port) -> + open(Port, []). + +%% @hidden +-spec open(Port :: inet:port_number(), Options :: [option()]) -> + {ok, inet:socket()} | {error, Reason :: reason()}. +open(Port, Options) -> + ControllingProcess = self(), + case socket:open(inet, dgram, udp) of + {ok, Socket} -> + EffectiveOptions = maps:merge(?DEFAULT_OPTIONS, proplist_to_map(Options)), + Addr = maps:get(ifaddr, EffectiveOptions, any), + case socket:bind(Socket, #{family => inet, addr => Addr, port => Port}) of + ok -> + gen_server:start_link( + ?MODULE, {Socket, ControllingProcess, EffectiveOptions}, [] + ); + BindError -> + BindError + end; + OpenError -> + OpenError + end. + +%% @hidden +-spec send( + Socket :: inet:socket(), + Address :: inet:ip_address(), + Port :: inet:port_number(), + Packet :: packet() +) -> ok | {error, reason()}. +send(Socket, Address, Port, Packet) -> + call(Socket, {sendto, Address, Port, Packet}). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer()) -> + {ok, {inet:ip_address(), inet:port_number(), packet()}} | {error, reason()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). + +%% @hidden +-spec recv(Socket :: inet:socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> + {ok, {inet:ip_address(), inet:port_number(), packet()}} | {error, reason()}. +recv(Socket, Length, Timeout) -> + call(Socket, {recvfrom, Length, Timeout}). + +%% @hidden +-spec close(inet:socket()) -> ok. +close(Socket) -> + call(Socket, close). + +%% @hidden +-spec controlling_process(Socket :: inet:socket(), Pid :: pid()) -> + ok | {error, Reason :: reason()}. +controlling_process(Socket, Pid) when is_pid(Pid) -> + %% WARNING: calling process is potentially spoofable + call(Socket, {controlling_process, Pid, self()}). + +%% +%% inet API +%% + +%% @hidden +port(Socket) -> + call(Socket, port). + +%% @hidden +sockname(Socket) -> + call(Socket, sockname). + +%% +%% gen_server implementation +%% + +-record(state, { + socket, + controlling_process = undefined, + monitor_ref = undefined, + active, + options, + pending_selects = #{} +}). + +init({Socket, ControllingProcess, Options}) -> + case maps:get(active, Options, true) of + true -> + Ref = erlang:make_ref(), + case socket:nif_select_read(Socket, Ref) of + ok -> + MonitorRef = erlang:monitor(process, ControllingProcess), + {ok, #state{ + socket = Socket, + controlling_process = ControllingProcess, + monitor_ref = MonitorRef, + active = true, + options = Options, + pending_selects = #{Ref => active} + }}; + {error, _Reason} = Error -> + Error + end; + _ -> + {ok, #state{ + socket = Socket, + controlling_process = ControllingProcess, + active = false, + options = Options + }} + end. + +%% @hidden +handle_call({sendto, Address, Port, Packet}, _From, State) -> + Dest = #{ + family => inet, + port => Port, + addr => Address + }, + {reply, socket:sendto(State#state.socket, Packet, Dest), State}; +handle_call({recvfrom, Length, Timeout}, From, State) -> + case State#state.active of + true -> + {reply, {error, einval}, State}; + _ -> + Ref = erlang:make_ref(), + case Timeout of + TimeoutMs when is_integer(TimeoutMs) andalso TimeoutMs >= 0 -> + erlang:send_after(TimeoutMs, self(), {timeout, Ref, From}); + infinity -> + ok + end, + case socket:nif_select_read(State#state.socket, Ref) of + ok -> + PendingSelects = State#state.pending_selects, + NewPendingSelects = PendingSelects#{Ref => {passive, From, Length, Timeout}}, + {noreply, State#state{pending_selects = NewPendingSelects}}; + {error, _Reason} = Error -> + ?LOG_ERROR("An error occurred calling socket:nif_select_read/2: ~p", [Error]), + {noreply, State} + end + end; +handle_call(port, _From, State) -> + Reply = + case socket:sockname(State#state.socket) of + {ok, Addr} -> + #{port := Port} = Addr, + {ok, Port}; + SocknameError -> + SocknameError + end, + {reply, Reply, State}; +handle_call(sockname, _From, State) -> + Reply = + case socket:sockname(State#state.socket) of + {ok, Addr} -> + #{port := Port, addr := Address} = Addr, + {ok, {Address, Port}}; + SocknameError -> + SocknameError + end, + {reply, Reply, State}; +handle_call(close, _From, State) -> + {stop, normal, socket:close(State#state.socket), State}; +handle_call({controlling_process, Pid, Caller}, _From, State) -> + case State#state.active of + false -> + {reply, {error, einval}, State}; + _ -> + case State#state.controlling_process =/= Caller of + true -> + {reply, {error, not_owner}, State}; + _ -> + MonitorRef = erlang:monitor(process, Pid), + true = erlang:demonitor(State#state.monitor_ref), + {reply, ok, State#state{controlling_process = Pid, monitor_ref = MonitorRef}} + end + end; +handle_call(Request, _From, State) -> + {reply, {unknown_request, Request}, State}. + +%% @hidden +handle_cast(_Request, State) -> + {noreply, State}. + +%% @hidden +handle_info({select, _Socket, Ref, ready_input}, State) -> + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + ?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]), + {noreply, State}; + active -> + NewState = handle_active_recvfrom(State), + {noreply, NewState}; + {passive, From, Length, Timeout} -> + NewState = handle_passive_recvfrom(State, From, Length, Timeout), + {noreply, NewState#state{ + pending_selects = maps:remove(Ref, State#state.pending_selects) + }} + end; +handle_info({timeout, Ref, From}, State) -> + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + %% in all liklhood the request as processed. Ignore the message + {noreply, State}; + _ -> + ?LOG_INFO("Select ref ~p in pending selects has timed out.", [Ref]), + gen_server:reply(From, {error, timeout}), + {noreply, State#state{pending_selects = maps:remove(Ref, State#state.pending_selects)}} + end; +handle_info({'DOWN', MonitorRef, process, ControllingProcess, _Info}, State) -> + case State#state.monitor_ref =:= MonitorRef of + true -> + ?LOG_INFO("Controlling process ~p has terminated. Stopping ~p.", [ + ControllingProcess, ?MODULE + ]), + {stop, normal, State}; + _ -> + {noreply, State} + end; +handle_info(_Info, State) -> + {noreply, State}. + +%% @hidden +terminate(Reason, State) -> + ?LOG_INFO("Closing socket ~p for reason ~p", [State#state.socket, Reason]), + catch socket:close(State#state.socket). + +%% +%% Internal operations +%% + +%% @private +proplist_to_map(PropList) -> + proplist_to_map(PropList, #{}). + +%% @private +proplist_to_map([], Accum) -> + Accum; +proplist_to_map([Atom | T], Accum) when is_atom(Atom) -> + proplist_to_map(T, Accum#{Atom => true}); +proplist_to_map([{K, V} | T], Accum) -> + proplist_to_map(T, Accum#{K => V}). + +%% @private +handle_active_recvfrom(State) -> + Socket = State#state.socket, + Length = maps:get(buffer, State#state.options, 0), + ControllingProcess = State#state.controlling_process, + WrappedSocket = {?GEN_UDP_MONIKER, self(), ?MODULE}, + %% CAUTION: internal API + case socket:nif_recvfrom(Socket, Length) of + {ok, {Address, Data}} -> + BinaryOrList = maybe_encode_binary(State#state.options, Data), + #{addr := Addr, port := Port} = Address, + ?LOG_INFO("Sending message to ControllingProcess ~p", [ControllingProcess]), + ControllingProcess ! {udp, WrappedSocket, Addr, Port, BinaryOrList}, + + %% start a new select + Ref = erlang:make_ref(), + case socket:nif_select_read(Socket, Ref) of + ok -> + PendingSelects = State#state.pending_selects, + NewPendingSelects = maps:remove(Ref, PendingSelects), + State#state{pending_selects = NewPendingSelects#{Ref => active}}; + _ -> + State + end; + {closed, _Ref} -> + % socket was closed by another process + % TODO: we need to handle: + % (a) SELECT_STOP being scheduled + % (b) flush of messages as we can have both + % {closed, Ref} and {select, _, Ref, _} in the + % queue + State#state.controlling_process ! {udp_closed, WrappedSocket}, + State; + {error, _} = E -> + %% CAUTION: internal API + socket:nif_select_stop(Socket), + ?LOG_INFO("unable to receive on pending select~n"), + State#state.controlling_process ! {udp_error, WrappedSocket, E}, + State + end. + +%% @private +handle_passive_recvfrom(State, From, Length, _Timeout) -> + Socket = State#state.socket, + %% CAUTION: internal API + case socket:nif_recvfrom(Socket, Length) of + {ok, {Address, Data}} -> + BinaryOrList = maybe_encode_binary(State#state.options, Data), + #{addr := Addr, port := Port} = Address, + % WrappedSocket = {?GEN_UDP_MONIKER, self(), ?MODULE}, + gen_server:reply(From, {ok, {Addr, Port, BinaryOrList}}), + State; + {error, _} = E -> + socket:nif_select_stop(Socket), + ?LOG_INFO("unable to receive on pending select~n"), + gen_server:reply(From, E), + State + end. + +%% @private +call(Pid, Request) -> + gen_server:call(Pid, Request, infinity). + +%% @private +maybe_encode_binary(Options, Data) -> + case encode_binary(Options) of + true -> + Data; + _ -> + binary_to_list(Data) + end. + +%% @private +encode_binary(Options) -> + case {maps:get(binary, Options, undefined), maps:get(list, Options, undefined)} of + {undefined, undefined} -> + true; + {true, _} -> + true; + _ -> + false + end. diff --git a/libs/estdlib/src/inet-priv.hrl b/libs/estdlib/src/inet-priv.hrl new file mode 100644 index 000000000..3406002aa --- /dev/null +++ b/libs/estdlib/src/inet-priv.hrl @@ -0,0 +1,22 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Fred Dushin +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-define(GEN_TCP_MONIKER, '$avm_gen_tcp'). +-define(GEN_UDP_MONIKER, '$avm_gen_udp'). diff --git a/libs/estdlib/src/inet.erl b/libs/estdlib/src/inet.erl index 369a5d8d7..6af6c9a60 100644 --- a/libs/estdlib/src/inet.erl +++ b/libs/estdlib/src/inet.erl @@ -22,8 +22,12 @@ -export([port/1, close/1, sockname/1, peername/1]). +-include("inet-priv.hrl"). + +-type moniker() :: ?GEN_TCP_MONIKER | ?GEN_UDP_MONIKER. +-type socket_impl() :: any(). +-type socket() :: {moniker(), socket_impl(), module()}. -type port_number() :: 0..65535. --type socket() :: pid(). -type ip_address() :: ip4_address(). -type ip4_address() :: {0..255, 0..255, 0..255, 0..255}. -type hostname() :: iodata(). @@ -39,8 +43,10 @@ %% @end %%----------------------------------------------------------------------------- -spec port(Socket :: socket()) -> port_number(). -port(Socket) -> - call(Socket, {get_port}). +port({Moniker, Socket, Module}) when + Moniker =:= ?GEN_TCP_MONIKER orelse Moniker =:= ?GEN_UDP_MONIKER +-> + Module:port(Socket). %%----------------------------------------------------------------------------- %% @param Socket the socket to close @@ -49,9 +55,10 @@ port(Socket) -> %% @end %%----------------------------------------------------------------------------- -spec close(Socket :: socket()) -> ok. -close(Socket) -> - call(Socket, {close}), - ok. +close({Moniker, Socket, Module}) when + Moniker =:= ?GEN_TCP_MONIKER orelse Moniker =:= ?GEN_UDP_MONIKER +-> + Module:close(Socket). %%----------------------------------------------------------------------------- %% @param Socket the socket @@ -62,8 +69,10 @@ close(Socket) -> %%----------------------------------------------------------------------------- -spec sockname(Socket :: socket()) -> {ok, {ip_address(), port_number()}} | {error, Reason :: term()}. -sockname(Socket) -> - call(Socket, {sockname}). +sockname({Moniker, Socket, Module}) when + Moniker =:= ?GEN_TCP_MONIKER orelse Moniker =:= ?GEN_UDP_MONIKER +-> + Module:sockname(Socket). %%----------------------------------------------------------------------------- %% @param Socket the socket @@ -74,16 +83,5 @@ sockname(Socket) -> %%----------------------------------------------------------------------------- -spec peername(Socket :: socket()) -> {ok, {ip_address(), port_number()}} | {error, Reason :: term()}. -peername(Socket) -> - call(Socket, {peername}). - -%% -%% Internal operations -%% - -call(Port, Msg) -> - case port:call(Port, Msg) of - {error, noproc} -> {error, einval}; - out_of_memory -> {error, enomem}; - Result -> Result - end. +peername({?GEN_TCP_MONIKER, Socket, Module}) -> + Module:peername(Socket). diff --git a/src/platforms/esp32/components/avm_builtins/socket_driver.c b/src/platforms/esp32/components/avm_builtins/socket_driver.c index 8089d74c5..030b647b1 100644 --- a/src/platforms/esp32/components/avm_builtins/socket_driver.c +++ b/src/platforms/esp32/components/avm_builtins/socket_driver.c @@ -68,7 +68,31 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx); static const char *const tcp_error_atom = "\x9" "tcp_error"; -static const char *const netconn_event_internal = "\x14" "$atomvm_netconn_event_internal"; +static const char *const netconn_event_internal = ATOM_STR("\x14", "$atomvm_netconn_event_internal"); +static const char *gen_tcp_moniker_atom = ATOM_STR("\xC", "$avm_gen_tcp"); +static const char *native_tcp_module_atom = ATOM_STR("\xC", "gen_tcp_inet"); +static const char *gen_udp_moniker_atom = ATOM_STR("\xC", "$avm_gen_udp"); +static const char *native_udp_module_atom = ATOM_STR("\xC", "gen_udp_inet"); + +static inline term create_socket_wrapper(term pid, const char *moniker_atom, const char *module_atom, Heap *heap, GlobalContext *global) +{ + term tuple = term_alloc_tuple(3, heap); + term_put_tuple_element(tuple, 0, globalcontext_make_atom(global, moniker_atom)); + term_put_tuple_element(tuple, 1, pid); + term_put_tuple_element(tuple, 2, globalcontext_make_atom(global, module_atom)); + + return tuple; +} + +static inline term create_tcp_socket_wrapper(term pid, Heap *heap, GlobalContext *global) +{ + return create_socket_wrapper(pid, gen_tcp_moniker_atom, native_tcp_module_atom, heap, global); +} + +static inline term create_udp_socket_wrapper(term pid, Heap *heap, GlobalContext *global) +{ + return create_socket_wrapper(pid, gen_udp_moniker_atom, native_udp_module_atom, heap, global); +} uint32_t socket_tuple_to_addr(term addr_tuple) { @@ -544,13 +568,16 @@ static void do_send_socket_error(Context *ctx, err_t status) if (socket_data->active) { // udp active sockets do not send errors if (socket_data->type != UDPSocket) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) { + // {tcp_error, {Moniker :: atom(), Socket :: pid(), Module :: module()}, Reason :: atom()} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3) + TUPLE_SIZE(3)) != MEMORY_GC_OK)) { AVM_ABORT(); } term reason_atom = lwip_error_atom(glb, status); term result_tuple = term_alloc_tuple(3, &ctx->heap); term_put_tuple_element(result_tuple, 0, globalcontext_make_atom(glb, tcp_error_atom)); - term_put_tuple_element(result_tuple, 1, term_from_local_process_id(ctx->process_id)); + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = create_tcp_socket_wrapper(socket_pid, &ctx->heap, glb); + term_put_tuple_element(result_tuple, 1, socket_wrapper); term_put_tuple_element(result_tuple, 2, reason_atom); globalcontext_send_message(glb, socket_data->controlling_process_pid, result_tuple); } @@ -566,12 +593,15 @@ static void do_send_tcp_closed(Context *ctx) GlobalContext *glb = ctx->global; struct SocketData *socket_data = ctx->platform_data; if (socket_data->active) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) { + // {tcp_closed, {Moniker :: atom(), Socket :: pid(), Module :: module()}} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + TUPLE_SIZE(3)) != MEMORY_GC_OK)) { AVM_ABORT(); } term result_tuple = term_alloc_tuple(2, &ctx->heap); term_put_tuple_element(result_tuple, 0, TCP_CLOSED_ATOM); - term_put_tuple_element(result_tuple, 1, term_from_local_process_id(ctx->process_id)); + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = create_tcp_socket_wrapper(socket_pid, &ctx->heap, glb); + term_put_tuple_element(result_tuple, 1, socket_wrapper); globalcontext_send_message(glb, socket_data->controlling_process_pid, result_tuple); } else { if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REPLY_SIZE) != MEMORY_GC_OK)) { @@ -659,7 +689,8 @@ static NativeHandlerResult do_receive_data(Context *ctx) int tuples_size; if (socket_data->active) { // tuples_size = 4 (result_tuple size) - tuples_size = TUPLE_SIZE(3); + // add 3-tuple for {Moniker :: atom(), Socket :: pid(), Module :: module()} + tuples_size = TUPLE_SIZE(3) + TUPLE_SIZE(3); } else { // tuples_size = 3 (ok_tuple size) tuples_size = TUPLE_SIZE(2) + REPLY_SIZE; @@ -708,7 +739,12 @@ static NativeHandlerResult do_receive_data(Context *ctx) if (socket_data->active) { term active_tuple = term_alloc_tuple(socket_data->type == TCPClientSocket ? 3 : 5, &ctx->heap); term_put_tuple_element(active_tuple, 0, socket_data->type == TCPClientSocket ? TCP_ATOM : UDP_ATOM); - term_put_tuple_element(active_tuple, 1, term_from_local_process_id(ctx->process_id)); + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = + socket_data->type == UDPSocket ? + create_udp_socket_wrapper(socket_pid, &ctx->heap, ctx->global) : + create_tcp_socket_wrapper(socket_pid, &ctx->heap, ctx->global); + term_put_tuple_element(active_tuple, 1, socket_wrapper); if (socket_data->type == TCPClientSocket) { term_put_tuple_element(active_tuple, 2, recv_term); } else { diff --git a/src/platforms/esp32/components/avm_sys/include/otp_socket_platform.h b/src/platforms/esp32/components/avm_sys/include/otp_socket_platform.h index 0d80b315b..fbaa75b49 100644 --- a/src/platforms/esp32/components/avm_sys/include/otp_socket_platform.h +++ b/src/platforms/esp32/components/avm_sys/include/otp_socket_platform.h @@ -29,6 +29,9 @@ extern "C" { #define TAG "otp_socket" +#define AVM_LOGD(tag, format, ...) \ + ; + #define AVM_LOGI ESP_LOGI #define AVM_LOGW ESP_LOGW #define AVM_LOGE ESP_LOGE diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_socket.erl b/src/platforms/esp32/test/main/test_erl_sources/test_socket.erl index c4e06fb06..f4ca3195e 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/test_socket.erl +++ b/src/platforms/esp32/test/main/test_erl_sources/test_socket.erl @@ -62,19 +62,26 @@ test_tcp_client(Active, BinaryOpt) -> true -> ok = receive - {tcp, Socket, <<"HTTP/1.1 301", _/binary>>} when BinaryOpt =:= binary -> ok; - {tcp, Socket, "HTTP/1.1 301" ++ _} when BinaryOpt =:= list -> ok; - {tcp, Socket, Packet} -> {unexpected_packet, Packet}; - UnexpectedAfterSend -> {unexpected_message_after_send, UnexpectedAfterSend} + {tcp, _WrappedSocket, <<"HTTP/1.1 301", _/binary>>} when + BinaryOpt =:= binary + -> + ok; + {tcp, _WrappedSocket, "HTTP/1.1 301" ++ _} when BinaryOpt =:= list -> ok; + {tcp, _WrappedSocket, Packet} -> + {unexpected_packet, Packet, {Active, BinaryOpt}}; + UnexpectedAfterSend -> + {test_tcp_client, unexpected_message_after_send, Socket, + UnexpectedAfterSend, {Active, BinaryOpt}} after 10000 -> {receive_packet, timeout} end, ok = receive - {tcp_closed, Socket} -> + {tcp_closed, _OtherWrappedSocket} -> ok; UnexpectedAfterPacket -> - {unexpected_message_after_packet, Socket, UnexpectedAfterPacket} + {unexpected_message_after_packet, Socket, UnexpectedAfterPacket, + {Active, BinaryOpt}} after 10000 -> {closed_socket, timeout} end, @@ -155,14 +162,14 @@ test_udp(Active, QueryID) -> ok = receive % {udp, Socket, {{1,1,1,1}, 53, <>}} -> ok; % not supported yet - {udp, Socket, {1, 1, 1, 1}, 53, <>} when + {udp, _WrappedSocket, {1, 1, 1, 1}, 53, <>} when B band 16#80 =:= 16#80 -> ok; - {udp, Socket, Addr, Port, Packet} -> + {udp, _WrappedSocket, Addr, Port, Packet} -> {unexpected_packet, Addr, Port, Packet}; UnexpectedAfterSend -> - {unexpected_message_after_send, UnexpectedAfterSend} + {test_udp, unexpected_message_after_send, UnexpectedAfterSend} after 10000 -> {receive_packet, timeout} end, @@ -228,8 +235,10 @@ test_tcp_server(Active, Port) -> case Active of true -> receive - {tcp, ConnectedSocket, <<"pong">>} -> ok; - UnexpectedClientMessage -> {unexpected_client, UnexpectedClientMessage} + {tcp, _WrappedConnectedSocket, <<"pong">>} -> + ok; + UnexpectedClientMessage -> + {test_tcp_server, unexpected_client, UnexpectedClientMessage} after 5000 -> client_timeout end; false -> @@ -273,8 +282,10 @@ tcp_client(Active, Port) -> case Active of true -> receive - {tcp, ClientSocket, <<"ping">>} -> ok; - UnexpectedClientMessage -> {unexpected_client, UnexpectedClientMessage} + {tcp, _WrappedClientSocketPing, <<"ping">>} -> + ok; + UnexpectedClientMessage -> + {tcp_client, unexpected_client, UnexpectedClientMessage} after 5000 -> client_timeout end; false -> @@ -297,7 +308,7 @@ tcp_client(Active, Port) -> case Active of true -> receive - {tcp_closed, ClientSocket} -> + {tcp_closed, _WrappedClientSocketClosed} -> ok; UnexpectedAfterPacket -> {unexpected_message_after_pong, ClientSocket, UnexpectedAfterPacket} diff --git a/src/platforms/generic_unix/lib/otp_socket_platform.h b/src/platforms/generic_unix/lib/otp_socket_platform.h index d5ddc5ff1..85962ea9c 100644 --- a/src/platforms/generic_unix/lib/otp_socket_platform.h +++ b/src/platforms/generic_unix/lib/otp_socket_platform.h @@ -26,6 +26,9 @@ #define TAG "otp_socket" +#define AVM_LOGD(tag, format, ...) \ + ; + #define AVM_LOGI(tag, format, ...) \ fprintf(stderr, "I %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 29cc0259d..f65c01ec4 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -101,12 +101,37 @@ const char *const not_owner_a = "\x9" "not_owner"; const char *const close_internal = "\x14" "$atomvm_socket_close"; +static const char *gen_tcp_moniker_atom = ATOM_STR("\xC", "$avm_gen_tcp"); +static const char *native_tcp_module_atom = ATOM_STR("\xC", "gen_tcp_inet"); +static const char *gen_udp_moniker_atom = ATOM_STR("\xC", "$avm_gen_udp"); +static const char *native_udp_module_atom = ATOM_STR("\xC", "gen_udp_inet"); + static EventListener *active_recv_callback(GlobalContext *glb, EventListener *listener); static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *listener); static EventListener *active_recvfrom_callback(GlobalContext *glb, EventListener *listener); static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListener *listener); static NativeHandlerResult socket_consume_mailbox(Context *ctx); +static inline term create_socket_wrapper(term pid, const char *moniker_atom, const char *module_atom, Heap *heap, GlobalContext *global) +{ + term tuple = term_alloc_tuple(3, heap); + term_put_tuple_element(tuple, 0, globalcontext_make_atom(global, moniker_atom)); + term_put_tuple_element(tuple, 1, pid); + term_put_tuple_element(tuple, 2, globalcontext_make_atom(global, module_atom)); + + return tuple; +} + +static inline term create_tcp_socket_wrapper(term pid, Heap *heap, GlobalContext *global) +{ + return create_socket_wrapper(pid, gen_tcp_moniker_atom, native_tcp_module_atom, heap, global); +} + +static inline term create_udp_socket_wrapper(term pid, Heap *heap, GlobalContext *global) +{ + return create_socket_wrapper(pid, gen_udp_moniker_atom, native_udp_module_atom, heap, global); +} + uint32_t socket_tuple_to_addr(term addr_tuple) { return ((term_to_int32(term_get_tuple_element(addr_tuple, 0)) & 0xFF) << 24) @@ -694,10 +719,12 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; if (len <= 0) { - // {tcp, Socket, {error, {SysCall, Errno}}} - BEGIN_WITH_STACK_HEAP(12, heap); + // {tcp_closed, {Moniker :: atom(), Socket :: pid(), Module :: module()}} + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(2) + TUPLE_SIZE(3), heap); term pid = socket_data->controlling_process; - term msgs[2] = { TCP_CLOSED_ATOM, term_from_local_process_id(ctx->process_id) }; + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = create_tcp_socket_wrapper(socket_pid, &heap, glb); + term msgs[2] = { TCP_CLOSED_ATOM, socket_wrapper }; term msg = port_heap_create_tuple_n(&heap, 2, msgs); port_send_message_nolock(glb, pid, msg); socket_data->active_listener = NULL; @@ -713,15 +740,18 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba } else { ensure_packet_avail = len * 2; } - // {tcp, pid, binary} + // {tcp, {Moniker :: atom(), pid(), Module :: module()}, binary} Heap heap; - if (UNLIKELY(memory_init_heap(&heap, 20 + ensure_packet_avail) != MEMORY_GC_OK)) { + size_t requested_size = TUPLE_SIZE(3) + TUPLE_SIZE(3) + ensure_packet_avail; + if (UNLIKELY(memory_init_heap(&heap, requested_size) != MEMORY_GC_OK)) { fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); AVM_ABORT(); } term pid = socket_data->controlling_process; term packet = socket_create_packet_term(buf, len, socket_data->binary, &heap, glb); - term msgs[3] = { TCP_ATOM, term_from_local_process_id(ctx->process_id), packet }; + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = create_tcp_socket_wrapper(socket_pid, &heap, glb); + term msgs[3] = { TCP_ATOM, socket_wrapper, packet }; term msg = port_heap_create_tuple_n(&heap, 3, msgs); port_send_message_nolock(glb, pid, msg); memory_destroy_heap(&heap, glb); @@ -835,10 +865,13 @@ static EventListener *active_recvfrom_callback(GlobalContext *glb, EventListener } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; if (len == -1) { - // {udp, Socket, {error, {SysCall, Errno}}} - BEGIN_WITH_STACK_HEAP(12, heap); + // {udp, {Moniker :: atom(), Socket :: pid(), Module :: module()}, {error, {SysCall, Errno}}} + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(3) + TUPLE_SIZE(3) + TUPLE_SIZE(2) + TUPLE_SIZE(2), heap); term pid = socket_data->controlling_process; - term msgs[3] = { UDP_ATOM, term_from_local_process_id(ctx->process_id), port_heap_create_sys_error_tuple(&heap, RECVFROM_ATOM, errno) }; + term socket_pid = term_from_local_process_id(ctx->process_id); + // printf("Sending tcp_closed wrapper to %i\n", ctx->process_id); + term socket_wrapper = create_udp_socket_wrapper(socket_pid, &heap, glb); + term msgs[3] = { UDP_ATOM, socket_wrapper, port_heap_create_sys_error_tuple(&heap, RECVFROM_ATOM, errno) }; term msg = port_heap_create_tuple_n(&heap, 3, msgs); port_send_message_nolock(glb, pid, msg); END_WITH_STACK_HEAP(heap, glb); @@ -850,9 +883,10 @@ static EventListener *active_recvfrom_callback(GlobalContext *glb, EventListener } else { ensure_packet_avail = len * 2; } - // {udp, pid, {int,int,int,int}, int, binary} + // {udp, {Moniker :: atom(), pid(), Module :: module()}, Address :: {int,int,int,int}, Port :: integer(), binary()} Heap heap; - if (UNLIKELY(memory_init_heap(&heap, 20 + ensure_packet_avail) != MEMORY_GC_OK)) { + size_t requested_size = TUPLE_SIZE(5) + TUPLE_SIZE(3) + TUPLE_SIZE(4) + ensure_packet_avail; + if (UNLIKELY(memory_init_heap(&heap, requested_size) != MEMORY_GC_OK)) { fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); AVM_ABORT(); } @@ -860,7 +894,9 @@ static EventListener *active_recvfrom_callback(GlobalContext *glb, EventListener term addr = socket_heap_tuple_from_addr(&heap, htonl(clientaddr.sin_addr.s_addr)); term port = term_from_int32(htons(clientaddr.sin_port)); term packet = socket_create_packet_term(buf, len, socket_data->binary, &heap, glb); - term msgs[5] = { UDP_ATOM, term_from_local_process_id(ctx->process_id), addr, port, packet }; + term socket_pid = term_from_local_process_id(ctx->process_id); + term socket_wrapper = create_udp_socket_wrapper(socket_pid, &heap, glb); + term msgs[5] = { UDP_ATOM, socket_wrapper, addr, port, packet }; term msg = port_heap_create_tuple_n(&heap, 5, msgs); port_send_message_nolock(glb, pid, msg); memory_destroy_heap(&heap, glb); diff --git a/src/platforms/rp2040/src/lib/otp_socket_platform.h b/src/platforms/rp2040/src/lib/otp_socket_platform.h index bb60e4535..4d9f251e3 100644 --- a/src/platforms/rp2040/src/lib/otp_socket_platform.h +++ b/src/platforms/rp2040/src/lib/otp_socket_platform.h @@ -33,6 +33,9 @@ #define TAG "otp_socket" +#define AVM_LOGD(tag, format, ...) \ + ; + #define AVM_LOGI(tag, format, ...) \ printf("I %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); diff --git a/tests/libs/estdlib/test_gen_tcp.erl b/tests/libs/estdlib/test_gen_tcp.erl index 5c427c0ff..be879e68f 100644 --- a/tests/libs/estdlib/test_gen_tcp.erl +++ b/tests/libs/estdlib/test_gen_tcp.erl @@ -39,16 +39,18 @@ test_echo_server(SpawnControllingProcess) -> {ok, {_Address, Port}} = inet:sockname(ListenSocket), Self = self(), - spawn(fun() -> - Self ! ready, - accept(Self, ListenSocket, SpawnControllingProcess) - end), + spawn( + fun() -> + Self ! ready, + accept(Self, ListenSocket, SpawnControllingProcess) + end + ), receive ready -> ok end, - test_send_receive(Port, 100, SpawnControllingProcess), + test_send_receive(Port, 10, SpawnControllingProcess), %% TODO bug closing listening socket % gen_tcp:close(ListenSocket), @@ -77,11 +79,13 @@ echo(Pid, Socket) -> ok; {tcp, Socket, Packet} -> ok = gen_tcp:send(Socket, Packet), - echo(Pid, Socket) + echo(Pid, Socket); + SomethingElse -> + erlang:display({echo, unexpected_message, SomethingElse}) end. test_send_receive(Port, N, SpawnControllingProcess) -> - {ok, Socket} = gen_tcp:connect(localhost, Port, [{active, true}]), + {ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, Port, [{active, true}]), case SpawnControllingProcess of false -> loop(Socket, N); @@ -119,8 +123,25 @@ loop(Socket, I) -> end. test_listen_connect_parameters() -> + case get_otp_version() of + Version when Version =:= atomvm orelse (is_integer(Version) andalso Version >= 24) -> + ok = test_listen_connect_parameters(socket, socket), + ok = test_listen_connect_parameters(inet, inet); + _ -> + ok = test_listen_connect_parameters(inet, inet) + end, + ok. + +test_listen_connect_parameters(InetClientBackend, InetServerBackend) -> Results = [ - test_listen_connect_parameters(ListenMode, ConnectMode, ListenActive, ConnectActive) + test_listen_connect_parameters( + InetClientBackend, + InetServerBackend, + ListenMode, + ConnectMode, + ListenActive, + ConnectActive + ) || ListenMode <- [binary, list], ConnectMode <- [binary, list], ListenActive <- [false, true], @@ -129,14 +150,46 @@ test_listen_connect_parameters() -> [] = [Error || Error <- Results, Error =/= ok], ok. -test_listen_connect_parameters(ListenMode, ConnectMode, ListenActive, ConnectActive) -> - {ok, ListenSocket} = gen_tcp:listen(0, [ListenMode, {active, ListenActive}, {buffer, 32}]), +test_listen_connect_parameters( + InetClientBackend, InetServerBackend, ListenMode, ConnectMode, ListenActive, ConnectActive +) -> + etest:flush_msg_queue(), + + case get_otp_version() of + Version when Version =:= atomvm orelse (is_integer(Version) andalso Version >= 24) -> + ServerBackendOption = [{inet_backend, InetServerBackend}], + ClientBackendOption = [{inet_backend, InetClientBackend}]; + _ -> + ServerBackendOption = [], + ClientBackendOption = [] + end, + + io:format( + "GEN_TCP-TEST> ServerBackendOption=~p ClientBackendOption=~p ListenMode=~p ConnectMode=~p ListenActive=~p ConnectActive=~p~n", + [ + ServerBackendOption, + ClientBackendOption, + ListenMode, + ConnectMode, + ListenActive, + ConnectActive + ] + ), + + NumMessages = 10, + + {ok, ListenSocket} = gen_tcp:listen( + 0, + ServerBackendOption ++ [ListenMode, {active, ListenActive}, {buffer, 32}] + ), {ok, {_Address, Port}} = inet:sockname(ListenSocket), Self = self(), ServerPid = spawn(fun() -> Self ! {self(), ready}, - Result = test_listen_connect_parameters_accept(ListenMode, ListenActive, ListenSocket), + Result = test_listen_connect_parameters_accept( + ListenMode, ListenActive, ListenSocket, NumMessages, Self + ), Self ! {self(), Result} end), receive @@ -144,12 +197,33 @@ test_listen_connect_parameters(ListenMode, ConnectMode, ListenActive, ConnectAct ok end, - {ok, Socket} = gen_tcp:connect(localhost, Port, [ConnectMode, {active, ConnectActive}]), - ok = test_listen_connect_parameters_client_loop(Socket, ConnectMode, ConnectActive, 10), + {ok, Socket} = gen_tcp:connect( + {127, 0, 0, 1}, + Port, + ClientBackendOption ++ [ConnectMode, {active, ConnectActive}] + ), + ok = test_listen_connect_parameters_client_loop( + Socket, ConnectMode, ConnectActive, NumMessages + ), + + %% race condition in active receive; client might + %% close connection before service has consumed (and delivered) + %% all messages to active recipient process. So we need + %% to wait until the server has actually processed all the + %% messages it is expected to. + receive + server_done -> + ok + end, + ok = gen_tcp:close(Socket), + receive - {ServerPid, Result} -> Result - after 5000 -> throw({timeout, waiting, recv, server_closed}) + {ServerPid, Result} -> + ok = gen_tcp:close(ListenSocket), + Result + after 5000 -> + throw({timeout, waiting, recv, server_closed}) end. test_listen_connect_parameters_client_loop(_Socket, _Mode, _Active, 0) -> @@ -161,22 +235,25 @@ test_listen_connect_parameters_client_loop(Socket, Mode, Active, I) -> test_listen_connect_parameters_client_loop0(Socket, Mode, true = Active, I) -> receive - {tcp_closed, Socket} -> + {tcp_closed, _Socket} -> ok; - {tcp, Socket, Packet} -> + {tcp, _Socket, Packet} -> if Mode =:= binary andalso is_binary(Packet) -> test_listen_connect_parameters_client_loop(Socket, Mode, Active, I - 1); Mode =:= list andalso is_list(Packet) -> test_listen_connect_parameters_client_loop(Socket, Mode, Active, I - 1); true -> - {error, {unexpected_packet_format, client, Packet, Mode}} + {error, + {unexpected_packet_format, client, active_receive, Packet, Mode, Active}} end; - Other -> - {error, {unexpected_message, Other}} + {error, _Reason} = Error -> + {error, {unexpected_message, client, active_receive, Error}} + after 5000 -> + {error, {timeout, client, active_receive, Mode, I}} end; test_listen_connect_parameters_client_loop0(Socket, Mode, false = Active, I) -> - case gen_tcp:recv(Socket, 0) of + case gen_tcp:recv(Socket, 0, 5000) of {error, closed} -> ok; {ok, Packet} -> @@ -186,49 +263,87 @@ test_listen_connect_parameters_client_loop0(Socket, Mode, false = Active, I) -> Mode =:= list andalso is_list(Packet) -> test_listen_connect_parameters_client_loop(Socket, Mode, Active, I - 1); true -> - {error, {unexpected_packet_format, client, Packet, Mode}} + {error, {unexpected_packet_format, client, passive_receive, Packet, Mode}} end; Other -> - {error, {unexpected_result, client, Other}} + {error, {unexpected_result, client, passive_receive, Other}} end. -test_listen_connect_parameters_accept(ListenMode, ListenActive, ListenSocket) -> +test_listen_connect_parameters_accept( + ListenMode, ListenActive, ListenSocket, NumMessages, WaitingPid +) -> {ok, Socket} = gen_tcp:accept(ListenSocket), - test_listen_connect_parameters_server_loop(ListenMode, ListenActive, Socket). + try + test_listen_connect_parameters_server_loop( + ListenMode, ListenActive, Socket, NumMessages, WaitingPid + ) + after + ok = gen_tcp:close(Socket) + end. -test_listen_connect_parameters_server_loop(ListenMode, true = ListenActive, Socket) -> +test_listen_connect_parameters_server_loop( + _ListenMode, true = _ListenActive, Socket, 0, WaitingPid +) -> + WaitingPid ! server_done, receive {tcp_closed, Socket} -> - ok; + ok + after 5000 -> + {error, {timeout, server, active_receive, waiting_for_close}} + end; +test_listen_connect_parameters_server_loop(ListenMode, true = ListenActive, Socket, I, WaitingPid) -> + receive + {tcp_closed, _Socket} -> + {error, {unexpected_close, server, active_receive}}; {tcp, Socket, Packet} -> ok = gen_tcp:send(Socket, Packet), if ListenMode =:= binary andalso is_binary(Packet) -> - test_listen_connect_parameters_server_loop(ListenMode, ListenActive, Socket); + test_listen_connect_parameters_server_loop( + ListenMode, ListenActive, Socket, I - 1, WaitingPid + ); ListenMode =:= list andalso is_list(Packet) -> - test_listen_connect_parameters_server_loop(ListenMode, ListenActive, Socket); + test_listen_connect_parameters_server_loop( + ListenMode, ListenActive, Socket, I - 1, WaitingPid + ); true -> - {error, {unexpected_packet_format, server, Packet, ListenMode}} + {error, {unexpected_packet_format, server, active_receive, Packet, ListenMode}} end; Other -> - {error, {unexpected_message, Other}} + {error, {unexpected_message, server, active_receive, Other}} + after 5000 -> + {error, {timeout, server, active_receive, ListenMode}} end; -test_listen_connect_parameters_server_loop(ListenMode, false = ListenActive, Socket) -> - case gen_tcp:recv(Socket, 0) of +test_listen_connect_parameters_server_loop( + _ListenMode, false = _ListenActive, Socket, 0, WaitingPid +) -> + WaitingPid ! server_done, + case gen_tcp:recv(Socket, 0, 5000) of {error, closed} -> ok; + {error, timeout} -> + {error, {timeout, server, passive_receive, waiting_for_close}} + end; +test_listen_connect_parameters_server_loop(ListenMode, false = ListenActive, Socket, I, WaitingPid) -> + case gen_tcp:recv(Socket, 0, 5000) of + {error, closed} -> + {error, {unexpected_close, server, passive_receive}}; {ok, Packet} -> ok = gen_tcp:send(Socket, Packet), if ListenMode =:= binary andalso is_binary(Packet) -> - test_listen_connect_parameters_server_loop(ListenMode, ListenActive, Socket); + test_listen_connect_parameters_server_loop( + ListenMode, ListenActive, Socket, I - 1, WaitingPid + ); ListenMode =:= list andalso is_list(Packet) -> - test_listen_connect_parameters_server_loop(ListenMode, ListenActive, Socket); + test_listen_connect_parameters_server_loop( + ListenMode, ListenActive, Socket, I - 1, WaitingPid + ); true -> - {error, {unexpected_packet_format, server, Packet, ListenMode}} + {error, {unexpected_packet_format, server, passive_receive, Packet, ListenMode}} end; Other -> - {error, {unexpected_result, server, Other}} + {error, {unexpected_result, server, passive_receive, Other}} end. test_tcp_double_close() -> @@ -237,3 +352,11 @@ test_tcp_double_close() -> ok = gen_tcp:close(Socket), {error, closed} = gen_tcp:recv(Socket, 512, 5000), ok. + +get_otp_version() -> + case erlang:system_info(machine) of + "BEAM" -> + list_to_integer(erlang:system_info(otp_release)); + _ -> + atomvm + end. diff --git a/tests/libs/estdlib/test_gen_udp.erl b/tests/libs/estdlib/test_gen_udp.erl index 0d484e843..342b5d6a6 100644 --- a/tests/libs/estdlib/test_gen_udp.erl +++ b/tests/libs/estdlib/test_gen_udp.erl @@ -25,14 +25,28 @@ -include("etest.hrl"). test() -> - ok = test_send_receive_active(false, binary), - ok = test_send_receive_active(true, binary), - ok = test_send_receive_active(false, list), - ok = test_send_receive_active(true, list), + BackendOptions = + case get_otp_version() of + Version when Version =:= atomvm orelse (is_integer(Version) andalso Version >= 24) -> + [[{inet_backend, inet}], [{inet_backend, socket}]]; + _ -> + [[]] + end, + [ + ok = test_send_receive(SpawnControllingProcess, IsActive, Mode, BackendOption) + || SpawnControllingProcess <- [false, true], + IsActive <- [false, true], + Mode <- [binary, list], + BackendOption <- BackendOptions + ], ok. -test_send_receive_active(SpawnControllingProcess, Mode) -> - {ok, Socket} = gen_udp:open(0, [{active, true}, Mode]), +test_send_receive(SpawnControllingProcess, IsActive, Mode, BackendOption) -> + io:format("GEN_UDP-TEST> SpawnControllingProcess=~p IsActive=~p Mode=~p Backendoption=~p~n", [ + SpawnControllingProcess, IsActive, Mode, BackendOption + ]), + + {ok, Socket} = gen_udp:open(0, BackendOption ++ [{active, IsActive}, Mode]), {ok, Port} = inet:port(Socket), Self = self(), @@ -41,7 +55,7 @@ test_send_receive_active(SpawnControllingProcess, Mode) -> true -> Self ! ready; _ -> ok end, - NumReceived = count_received(Mode), + NumReceived = count_received(Socket, IsActive, Mode), case SpawnControllingProcess of true -> case SpawnControllingProcess of @@ -81,7 +95,13 @@ test_send_receive_active(SpawnControllingProcess, Mode) -> Sender ! stop, ?ASSERT_TRUE((0 < NumReceived) and (NumReceived =< NumToSend)), - ok = gen_udp:close(Socket), + %% NB. Might be closed if controlling process terminates + case SpawnControllingProcess of + true -> + catch gen_udp:close(Socket); + _ -> + ok = gen_udp:close(Socket) + end, ok. make_messages(0) -> @@ -102,17 +122,41 @@ send(Socket, Port, [Msg | Rest]) -> gen_udp:send(Socket, {127, 0, 0, 1}, Port, Msg), send(Socket, Port, Rest). -count_received(Mode) -> - count_received0(Mode, 0). +count_received(_Socket, true = _IsActive, Mode) -> + count_active_received(Mode, 0); +count_received(Socket, false = _IsActive, Mode) -> + count_passive_received(Socket, Mode, 0). -count_received0(Mode, I) -> +count_active_received(Mode, I) -> receive {udp, _Pid, _Address, _Port, <<"foo">>} when Mode =:= binary -> - count_received0(Mode, I + 1); + count_active_received(Mode, I + 1); {udp, _Pid, _Address, _Port, "foo"} when Mode =:= list -> - count_received0(Mode, I + 1); + count_active_received(Mode, I + 1); Other -> - erlang:display({unexpected, Other}), - count_received0(Mode, I) - after 500 -> I + erlang:display({count_active_received, unexpected, Other}), + count_active_received(Mode, I) + after 500 -> + I + end. + +count_passive_received(Socket, Mode, I) -> + case gen_udp:recv(Socket, 0, 500) of + {ok, {_Address, _Port, <<"foo">>}} when Mode =:= binary -> + count_passive_received(Socket, Mode, I + 1); + {ok, {_Address, _Port, "foo"}} when Mode =:= list -> + count_passive_received(Socket, Mode, I + 1); + {error, timeout} -> + I; + Other -> + erlang:display({count_passive_received, unexpected, Other}), + count_passive_received(Socket, Mode, I) + end. + +get_otp_version() -> + case erlang:system_info(machine) of + "BEAM" -> + list_to_integer(erlang:system_info(otp_release)); + _ -> + atomvm end.