Skip to content

Commit

Permalink
Distribution: add support for rpc from other nodes
Browse files Browse the repository at this point in the history
- Also add epmd_disterl esp32 example

Signed-off-by: Paul Guyot <pguyot@kallisys.net>
  • Loading branch information
pguyot committed Jan 20, 2025
1 parent 6da1e93 commit 09fb68b
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 38 deletions.
1 change: 1 addition & 0 deletions examples/erlang/esp32/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ pack_runnable(sx127x sx127x eavmlib estdlib)
pack_runnable(reformat_nvs reformat_nvs eavmlib)
pack_runnable(uartecho uartecho eavmlib estdlib)
pack_runnable(ledc_example ledc_example eavmlib estdlib)
pack_runnable(epmd_disterl epmd_disterl eavmlib estdlib)
56 changes: 56 additions & 0 deletions examples/erlang/esp32/epmd_disterl.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
%
% This file is part of AtomVM.
%
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

-module(epmd_disterl).

-export([start/0]).

start() ->
Creds = [
{ssid, "myssid"},
{psk, "mypsk"}
],
case network:wait_for_sta(Creds, 30000) of
{ok, {Address, _Netmask, _Gateway}} ->
distribution_start(Address);
Error ->
io:format("An error occurred starting network: ~p~n", [Error])
end.

distribution_start(Address) ->
{ok, _EPMDPid} = epmd:start_link([]),
{ok, _KernelPid} = kernel:start(normal, []),
{X, Y, Z, T} = Address,
Node = list_to_atom(lists:flatten(io_lib:format("atomvm@~B.~B.~B.~B", [X, Y, Z, T]))),
{ok, _NetKernelPid} = net_kernel:start(Node, #{name_domain => longnames}),
io:format("Distribution was started\n"),
io:format("Node is ~p\n", [node()]),
net_kernel:set_cookie(<<"AtomVM">>),
io:format("Cookie is ~s\n", [net_kernel:get_cookie()]),
register(disterl, self()),
io:format(
"This AtomVM node is waiting for 'quit' message, and this process is registered as 'disterl'\n"
),
io:format("On an OTP node with long names distribution, run:\n"),
io:format("erlang:set_cookie('~s', 'AtomVM').\n", [Node]),
io:format("{disterl, '~s'} ! quit.\n", [Node]),
receive
quit -> ok
end.
1 change: 1 addition & 0 deletions libs/estdlib/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(ERLANG_MODULES
crypto
dist_util
erl_epmd
erpc
erts_debug
ets
gen_event
Expand Down
59 changes: 59 additions & 0 deletions libs/estdlib/src/erpc.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
%
% This file is part of AtomVM.
%
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

%%-----------------------------------------------------------------------------
%% @doc An implementation of the Erlang/OTP erpc interface.
%%
%% This module implements a strict subset of the Erlang/OTP erpc
%% interface.
%% @end
%%-----------------------------------------------------------------------------
-module(erpc).

% api
-export([
execute_call/4
]).

%%-----------------------------------------------------------------------------
%% @param Reference reference of the request, passed in exit tuple
%% @param Module module to call
%% @param Func function to call
%% @param Args argument of the call
%% @doc Execute a call locally, exiting with the result.
%% This function is called from rpc on other nodes using spawn_request BIF.
%% @end
%%-----------------------------------------------------------------------------
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
no_return().
execute_call(Reference, Module, Func, Args) ->
Reply =
try
Result = apply(Module, Func, Args),
{Reference, return, Result}
catch
throw:Reason ->
{Reference, throw, Reason};
exit:Reason ->
{Reference, exit, Reason};
error:Reason:Stack ->
{Reference, error, Reason, Stack}
end,
exit(Reply).
1 change: 1 addition & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ X(INET_ATOM, "\x4", "inet")
X(TIMEOUT_ATOM, "\x7", "timeout")

X(DIST_DATA_ATOM, "\x9", "dist_data")
X(REQUEST_ATOM, "\x7", "request")
142 changes: 107 additions & 35 deletions src/libAtomVM/dist_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ enum
OPERATION_ALIAS_SEND_TT = 34,
};

enum
{
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
};

struct DistributionPacket
{
struct ListHead head;
Expand Down Expand Up @@ -129,7 +135,7 @@ static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj)

static void dist_enqueue_message(term control_message, term payload, struct DistConnection *connection, GlobalContext *global)
{
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 is not smart enough
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 are not smart enough
enum ExternalTermResult serialize_result = externalterm_compute_external_size(control_message, &control_message_size, global);
if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) {
size_t payload_size = 0;
Expand Down Expand Up @@ -195,10 +201,7 @@ static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pi

struct DistConnection *conn_obj = (struct DistConnection *) obj;

if (UNLIKELY(enif_compare_monitors(&conn_obj->connection_process_monitor, mon) == 0)) {
struct RefcBinary *rsrc_refc = refc_binary_from_data(obj);
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
} else {
if (enif_compare_monitors(&conn_obj->connection_process_monitor, mon) != 0) {
struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors);
struct ListHead *item;
LIST_FOR_EACH (item, remote_monitors) {
Expand Down Expand Up @@ -280,10 +283,6 @@ static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[])
list_prepend(dist_connections, &conn_obj->head);
synclist_unlock(&ctx->global->dist_connections);

// Increment reference count as the resource should be alive until controller process dies
struct RefcBinary *rsrc_refc = refc_binary_from_data(conn_obj);
refc_binary_increment_refcount(rsrc_refc);

if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
Expand Down Expand Up @@ -339,6 +338,38 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
return result;
}

term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
{
if (term_is_atom(target_proc)) {
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
}
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
return OK_ATOM;
}

static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
Expand Down Expand Up @@ -390,32 +421,8 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
term from_pid = term_get_tuple_element(control, 1);
term target_proc = term_get_tuple_element(control, 2);
term monitor_ref = term_get_tuple_element(control, 3);
if (term_is_atom(target_proc)) {
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
}
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
if (UNLIKELY(term_is_invalid_term(dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx)))) {
return term_invalid_term();
}

break;
Expand Down Expand Up @@ -443,6 +450,54 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
synclist_unlock(&conn_obj->remote_monitors);
break;
}
case OPERATION_SPAWN_REQUEST: {
if (UNLIKELY(arity != 6)) {
RAISE_ERROR(BADARG_ATOM);
}
term roots[4];
roots[0] = argv[0];
roots[1] = argv[1];
roots[2] = control;
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
control = roots[2];
term arglist = roots[3];
term mfa = term_get_tuple_element(control, 4);
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
RAISE_ERROR(BADARG_ATOM);
}
if (UNLIKELY(!term_is_list(arglist))) {
RAISE_ERROR(BADARG_ATOM);
}
term reqid = term_get_tuple_element(control, 1);
term from = term_get_tuple_element(control, 2);
if (UNLIKELY(!term_is_pid(from))) {
RAISE_ERROR(BADARG_ATOM);
}
// term groupleader = term_get_tuple_element(control, 3);
// TODO: handle groupleader which is an externalpid
term options = term_get_tuple_element(control, 5);

term request_tuple = term_alloc_tuple(4, &ctx->heap);
term_put_tuple_element(request_tuple, 0, roots[0]);
term_put_tuple_element(request_tuple, 1, reqid);
term_put_tuple_element(request_tuple, 2, from);
term_put_tuple_element(request_tuple, 3, options);
term request_opt = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
term_put_tuple_element(request_opt, 1, request_tuple);
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);

// reuse roots for args
roots[0] = term_get_tuple_element(mfa, 0);
roots[1] = term_get_tuple_element(mfa, 1);
roots[2] = arglist;
roots[3] = spawn_opts;
nif_erlang_spawn_opt(ctx, 4, roots);
break;
}
default:
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
RAISE_ERROR(BADARG_ATOM);
Expand All @@ -468,6 +523,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
synclist_unlock(&ctx->global->dist_connections);
}

void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
{
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
// allocate tuple
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
term control_message = term_alloc_tuple(5, &heap);
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
term_put_tuple_element(control_message, 1, req_id);
term_put_tuple_element(control_message, 2, to_pid);
term_put_tuple_element(control_message, 3, term_from_int(flags));
term_put_tuple_element(control_message, 4, result);

dist_enqueue_message(control_message, term_invalid_term(), connection, global);
END_WITH_STACK_HEAP(heap, global)
}

const struct Nif setnode_3_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_erlang_setnode_3
Expand Down
27 changes: 27 additions & 0 deletions src/libAtomVM/dist_nifs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
extern const struct Nif dist_ctrl_get_data_nif;
extern const struct Nif dist_ctrl_put_data_nif;

struct DistConnection;

void dist_send_message(term external_pid, term payload, Context *ctx);

/**
* @doc Setup a monitor on a local process for a distributed process.
* @end
* @param conn_obj object of the connection
* @param from_pid remote pid setting up the monitor
* @param target_proc atom (for registered process) or pid of the local
* process to monitor
* @param monitor_ref reference used for monitor
* @param ctx context for memory allocation
*/
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);

/**
* @doc Send a spawn reply signal to a node
* @end
* @param conn_obj object of the connection
* @param req_id reference identifying the request
* @param to_pid (remote) process id identifying the caller
* @param link if a link was created
* @param monitor if a monitor was created
* @param result pid of the spawned process or atom for an error
* @param ctx context for memory allocation
*/
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 09fb68b

Please sign in to comment.