Skip to content

Commit

Permalink
Distribution: add support for autoconnect
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
  • Loading branch information
pguyot committed Jan 11, 2025
1 parent 20a5f22 commit ac9c093
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 41 deletions.
94 changes: 89 additions & 5 deletions libs/estdlib/src/dist_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,27 @@ handshake_other_started(#hs_data{socket = Socket, f_recv = Recv} = HSData0) ->
end.

-spec handshake_we_started(#hs_data{}) -> no_return().
handshake_we_started(#hs_data{}) -> ok.
handshake_we_started(#hs_data{} = HSData0) ->
HSData1 = HSData0#hs_data{
other_started = false,
this_flags = ?MANDATORY_DFLAGS
},
send_name(HSData1),
case recv_status(HSData1) of
<<"ok">> -> ok;
<<"ok_simultaneous">> -> ok;
<<"nok">> -> ?shutdown({HSData1#hs_data.other_node, simultaneous});
<<"alive">> -> send_status(<<"true">>, HSData1);
Other -> ?shutdown({HSData1#hs_data.other_node, {unexpected, Other}})
end,
Cookie = net_kernel:get_cookie(HSData1#hs_data.other_node),
{OtherChallenge, OtherFlags, Creation} = recv_challenge(HSData1),
check_flags(OtherFlags, HSData1),
<<MyChallenge:32>> = crypto:strong_rand_bytes(4),
send_challenge_reply(Cookie, OtherChallenge, MyChallenge, HSData1),
OtherDigest = recv_challenge_ack(HSData1),
check_challenge(Cookie, MyChallenge, OtherDigest, HSData1),
connection(HSData1, Creation).

% We are connected
-spec connection(#hs_data{}, non_neg_integer()) -> no_return().
Expand Down Expand Up @@ -359,6 +379,20 @@ check_flags(Flags0, HSData) ->
?shutdown(Reason)
end.

% send name
send_name(
#hs_data{socket = Socket, f_send = Send, this_node = ThisNode, this_flags = ThisFlags} = HSData
) ->
Creation = atomvm:get_creation(),
NodeName = atom_to_binary(ThisNode, latin1),
NameLen = byte_size(NodeName),
case Send(Socket, <<$N, ThisFlags:64, Creation:32, NameLen:16, NodeName/binary>>) of
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {send_name_failed, Error});
ok ->
ok
end.

% Ensure name is somewhat valid
-spec check_name(binary()) -> ok.
check_name(Name) ->
Expand All @@ -378,13 +412,13 @@ send_status(Status, #hs_data{socket = Socket, f_send = Send} = HSData) ->
ok
end.

-spec recv_status_reply(#hs_data{}) -> binary().
recv_status_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
-spec recv_status(#hs_data{}) -> binary().
recv_status(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
{ok, <<$s, Result/binary>>} ->
Result;
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status_reply, Other}});
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status, Other}});
{error, Reason} ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.
Expand All @@ -403,7 +437,7 @@ mark_pending(#hs_data{kernel_pid = Kernel, this_node = ThisNode, other_node = Ot
alive ->
send_status(<<"alive">>, HSData),
reset_timer(HSData#hs_data.timer),
case recv_status_reply(HSData) of
case recv_status(HSData) of
<<"true">> -> ok;
<<"false">> -> ?shutdown(OtherNode);
Other -> ?shutdown({OtherNode, {unexpected, Other}})
Expand Down Expand Up @@ -434,6 +468,28 @@ send_challenge(
ok
end.

recv_challenge(
#hs_data{other_node = OtherNode, socket = Socket, f_recv = Recv} = HSData
) ->
case Recv(Socket, 0, infinity) of
{ok, <<
$N, OtherFlags:64, Challenge:32, OtherCreation:32, _OtherNameLen:16, OtherName/binary
>>} ->
case atom_to_binary(OtherNode, utf8) =/= OtherName of
true ->
?shutdown({
HSData#hs_data.other_node, {mismatch, recv_challenge, OtherNode, OtherName}
});
false ->
ok
end,
{Challenge, OtherFlags, OtherCreation};
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge, Other}});
{error, Reason} ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.

-spec recv_challenge_reply(#hs_data{}) -> {non_neg_integer(), binary()}.
recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
Expand All @@ -445,6 +501,23 @@ recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
end.

-spec send_challenge_reply(
Cookie :: binary(),
OtherChallenge :: non_neg_integer(),
MyChallenge :: non_neg_integer(),
#hs_data{}
) -> ok.
send_challenge_reply(
Cookie, OtherChallenge, MyChallenge, #hs_data{socket = Socket, f_send = Send} = HSData
) ->
Digest = gen_digest(Cookie, OtherChallenge),
case Send(Socket, <<$r, MyChallenge:32, Digest:16/binary>>) of
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {send_challenge_reply_failed, Error});
ok ->
ok
end.

-spec check_challenge(
Cookie :: binary(), Challenge :: non_neg_integer(), Digest :: binary(), #hs_data{}
) -> ok.
Expand All @@ -470,6 +543,17 @@ send_challenge_ack(Cookie, Challenge, #hs_data{socket = Socket, f_send = Send} =
ok
end.

-spec recv_challenge_ack(#hs_data{}) -> binary().
recv_challenge_ack(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
case Recv(Socket, 0, infinity) of
{ok, <<$a, Digest/binary>>} ->
Digest;
{ok, Other} ->
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge_ack, Other}});
{error, _} = Error ->
?shutdown2({HSData#hs_data.other_node, Socket}, {recv_challenge_ack, Error})
end.

-spec shutdown(atom(), non_neg_integer(), term()) -> no_return().
shutdown(Module, Line, Data) ->
shutdown(Module, Line, Data, shutdown).
Expand Down
26 changes: 21 additions & 5 deletions libs/estdlib/src/net_kernel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,17 @@ handle_call(
case maps:find(OtherNode, Connections) of
error ->
{reply, ok, State0#state{
connections = maps:put(OtherNode, {pending, ConnPid}, Connections)
connections = maps:put(OtherNode, {pending, ConnPid, undefined}, Connections)
}};
{ok, {pending, OtherConnPid}} when OtherNode > ThisNode ->
{ok, {pending, undefined, DHandle}} ->
{reply, ok, State0#state{
connections = maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections)
}};
{ok, {pending, OtherConnPid, DHandle}} when OtherNode > ThisNode ->
{reply, {ok_simultaneous, OtherConnPid}, State0#state{
connections = maps:update(OtherNode, {pending, ConnPid}, Connections)
connections = maps:update(OtherNode, {pending, ConnPid, DHandle}, Connections)
}};
{ok, {pending, _OtherConnPid}} ->
{ok, {pending, _OtherConnPid, _DHandle}} ->
{reply, nok, State0};
{ok, {alive, _ConnPid, _Address}} ->
{reply, alive, State0}
Expand Down Expand Up @@ -294,12 +298,24 @@ handle_info({'EXIT', Pid, _Reason}, #state{connections = Connections} = State) -
fun(_Node, Status) ->
case Status of
{alive, Pid, _Address} -> false;
{pending, Pid} -> false;
{pending, Pid, _DHandle} -> false;
_ -> true
end
end,
Connections
),
{noreply, State#state{connections = NewConnections}};
handle_info({connect, OtherNode, DHandle}, #state{connections = Connections, node = MyNode, longnames = Longnames, proto_dist = ProtoDist} = State) ->
% ensure DHandle is not garbage collected until setup failed or succeeded
NewConnections = case maps:find(OtherNode, Connections) of
error ->
ProtoDist:setup(OtherNode, normal, MyNode, Longnames, ?SETUPTIME),
maps:put(OtherNode, {pending, undefined, DHandle}, Connections);
{ok, {pending, ConnPid, _}} ->
maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections);
{ok, {alive, _ConnPid, _Address}} ->
Connections
end,
{noreply, State#state{connections = NewConnections}}.

%% @hidden
Expand Down
7 changes: 1 addition & 6 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,4 @@ X(TIMEOUT_ATOM, "\x7", "timeout")
X(DIST_DATA_ATOM, "\x9", "dist_data")

X(REQUEST_ATOM, "\x7", "request")
X(REPLY_TAG_ATOM, "\x9", "reply_tag")
X(SPAWN_REPLY_ATOM, "\xB", "spawn_reply")
X(REPLY_ATOM, "\x5", "reply")
X(YES_ATOM, "\x3", "yes")
X(NO_ATOM, "\x2", "no")
X(ERROR_ONLY_ATOM, "\xA", "error_only")
X(CONNECT_ATOM, "\x7", "connect")
Loading

0 comments on commit ac9c093

Please sign in to comment.