Skip to content

Commit

Permalink
Merge pull request #1468 from pguyot/w02/erlang-distribution-03
Browse files Browse the repository at this point in the history
Distribution: add support for autoconnect

Allow connections from AtomVM to other nodes (OTP or AtomVM).

These changes are made under both the "Apache 2.0" and the "GNU Lesser General
Public License 2.1 or later" license terms (dual license).

SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
  • Loading branch information
bettio committed Jan 21, 2025
2 parents 41d90c8 + 47bbff1 commit ec336e4
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 37 deletions.
94 changes: 89 additions & 5 deletions libs/estdlib/src/dist_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,27 @@ handshake_other_started(#hs_data{socket = Socket, f_recv = Recv} = HSData0) ->
end.

-spec handshake_we_started(#hs_data{}) -> no_return().
handshake_we_started(#hs_data{}) -> ok.
handshake_we_started(#hs_data{} = HSData0) ->
HSData1 = HSData0#hs_data{
other_started = false,
this_flags = ?MANDATORY_DFLAGS
},
send_name(HSData1),
case recv_status(HSData1) of
<<"ok">> -> ok;
<<"ok_simultaneous">> -> ok;
<<"nok">> -> ?shutdown({HSData1#hs_data.other_node, simultaneous});
<<"alive">> -> send_status(<<"true">>, HSData1);
Other -> ?shutdown({HSData1#hs_data.other_node, {unexpected, Other}})
end,
Cookie = net_kernel:get_cookie(HSData1#hs_data.other_node),
{OtherChallenge, OtherFlags, Creation} = recv_challenge(HSData1),
check_flags(OtherFlags, HSData1),
<<MyChallenge:32>> = crypto:strong_rand_bytes(4),
send_challenge_reply(Cookie, OtherChallenge, MyChallenge, HSData1),
OtherDigest = recv_challenge_ack(HSData1),
check_challenge(Cookie, MyChallenge, OtherDigest, HSData1),
connection(HSData1, Creation).

% We are connected
-spec connection(#hs_data{}, non_neg_integer()) -> no_return().
Expand Down Expand Up @@ -359,6 +379,20 @@ check_flags(Flags0, HSData) ->
?shutdown(Reason)
end.

% send name
send_name(
#hs_data{socket = Socket, f_send = Send, this_node = ThisNode, this_flags = ThisFlags} = HSData
) ->
Creation = atomvm:get_creation(),
NodeName = atom_to_binary(ThisNode, latin1),
NameLen = byte_size(NodeName),
case Send(Socket, <<$N, ThisFlags:64, Creation:32, NameLen:16, NodeName/binary>>) of
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {send_name_failed, Error});
ok ->
ok
end.

% Ensure name is somewhat valid
-spec check_name(binary()) -> ok.
check_name(Name) ->
Expand All @@ -378,13 +412,13 @@ send_status(Status, #hs_data{socket = Socket, f_send = Send} = HSData) ->
ok
end.

-spec recv_status_reply(#hs_data{}) -> binary().
recv_status_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
-spec recv_status(#hs_data{}) -> binary().
recv_status(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
{ok, <<$s, Result/binary>>} ->
Result;
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status_reply, Other}});
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status, Other}});
{error, Reason} ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.
Expand All @@ -403,7 +437,7 @@ mark_pending(#hs_data{kernel_pid = Kernel, this_node = ThisNode, other_node = Ot
alive ->
send_status(<<"alive">>, HSData),
reset_timer(HSData#hs_data.timer),
case recv_status_reply(HSData) of
case recv_status(HSData) of
<<"true">> -> ok;
<<"false">> -> ?shutdown(OtherNode);
Other -> ?shutdown({OtherNode, {unexpected, Other}})
Expand Down Expand Up @@ -434,6 +468,28 @@ send_challenge(
ok
end.

recv_challenge(
#hs_data{other_node = OtherNode, socket = Socket, f_recv = Recv} = HSData
) ->
case Recv(Socket, 0, infinity) of
{ok, <<
$N, OtherFlags:64, Challenge:32, OtherCreation:32, _OtherNameLen:16, OtherName/binary
>>} ->
case atom_to_binary(OtherNode, utf8) =/= OtherName of
true ->
?shutdown({
HSData#hs_data.other_node, {mismatch, recv_challenge, OtherNode, OtherName}
});
false ->
ok
end,
{Challenge, OtherFlags, OtherCreation};
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge, Other}});
{error, Reason} ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.

-spec recv_challenge_reply(#hs_data{}) -> {non_neg_integer(), binary()}.
recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
Expand All @@ -445,6 +501,23 @@ recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.

-spec send_challenge_reply(
Cookie :: binary(),
OtherChallenge :: non_neg_integer(),
MyChallenge :: non_neg_integer(),
#hs_data{}
) -> ok.
send_challenge_reply(
Cookie, OtherChallenge, MyChallenge, #hs_data{socket = Socket, f_send = Send} = HSData
) ->
Digest = gen_digest(Cookie, OtherChallenge),
case Send(Socket, <<$r, MyChallenge:32, Digest:16/binary>>) of
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {send_challenge_reply_failed, Error});
ok ->
ok
end.

-spec check_challenge(
Cookie :: binary(), Challenge :: non_neg_integer(), Digest :: binary(), #hs_data{}
) -> ok.
Expand All @@ -470,6 +543,17 @@ send_challenge_ack(Cookie, Challenge, #hs_data{socket = Socket, f_send = Send} =
ok
end.

-spec recv_challenge_ack(#hs_data{}) -> binary().
recv_challenge_ack(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
{ok, <<$a, Digest/binary>>} ->
Digest;
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge_ack, Other}});
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {recv_challenge_ack, Error})
end.

-spec shutdown(atom(), non_neg_integer(), term()) -> no_return().
shutdown(Module, Line, Data) ->
shutdown(Module, Line, Data, shutdown).
Expand Down
31 changes: 26 additions & 5 deletions libs/estdlib/src/net_kernel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,17 @@ handle_call(
case maps:find(OtherNode, Connections) of
error ->
{reply, ok, State0#state{
connections = maps:put(OtherNode, {pending, ConnPid}, Connections)
connections = maps:put(OtherNode, {pending, ConnPid, undefined}, Connections)
}};
{ok, {pending, OtherConnPid}} when OtherNode > ThisNode ->
{ok, {pending, undefined, DHandle}} ->
{reply, ok, State0#state{
connections = maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections)
}};
{ok, {pending, OtherConnPid, DHandle}} when OtherNode > ThisNode ->
{reply, {ok_simultaneous, OtherConnPid}, State0#state{
connections = maps:update(OtherNode, {pending, ConnPid}, Connections)
connections = maps:update(OtherNode, {pending, ConnPid, DHandle}, Connections)
}};
{ok, {pending, _OtherConnPid}} ->
{ok, {pending, _OtherConnPid, _DHandle}} ->
{reply, nok, State0};
{ok, {alive, _ConnPid, _Address}} ->
{reply, alive, State0}
Expand Down Expand Up @@ -296,12 +300,29 @@ handle_info({'EXIT', Pid, _Reason}, #state{connections = Connections} = State) -
fun(_Node, Status) ->
case Status of
{alive, Pid, _Address} -> false;
{pending, Pid} -> false;
{pending, Pid, _DHandle} -> false;
_ -> true
end
end,
Connections
),
{noreply, State#state{connections = NewConnections}};
handle_info(
{connect, OtherNode, DHandle},
#state{connections = Connections, node = MyNode, longnames = Longnames, proto_dist = ProtoDist} =
State
) ->
% ensure DHandle is not garbage collected until setup failed or succeeded
NewConnections =
case maps:find(OtherNode, Connections) of
error ->
ProtoDist:setup(OtherNode, normal, MyNode, Longnames, ?SETUPTIME),
maps:put(OtherNode, {pending, undefined, DHandle}, Connections);
{ok, {pending, ConnPid, _}} ->
maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections);
{ok, {alive, _ConnPid, _Address}} ->
Connections
end,
{noreply, State#state{connections = NewConnections}}.

%% @hidden
Expand Down
3 changes: 3 additions & 0 deletions libs/estdlib/src/socket_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ do_setup(Kernel, Node, Type, MyNode, _LongOrShortNames, SetupTime) ->
of
ok ->
{ok, DistController} = socket_dist_controller:start(Sock),
true = socket_dist_controller:supervisor(
DistController, self()
),
HSData = hs_data(
Kernel,
MyNode,
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,4 @@ X(TIMEOUT_ATOM, "\x7", "timeout")

X(DIST_DATA_ATOM, "\x9", "dist_data")
X(REQUEST_ATOM, "\x7", "request")
X(CONNECT_ATOM, "\x7", "connect")
Loading

0 comments on commit ec336e4

Please sign in to comment.