Skip to content

Commit

Permalink
Merge pull request #1467 from pguyot/w02/erlang-distribution-02
Browse files Browse the repository at this point in the history
Distribution: add support for rpc call from other nodes

These changes are made under both the "Apache 2.0" and the "GNU Lesser General
Public License 2.1 or later" license terms (dual license).

SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
  • Loading branch information
bettio committed Jan 20, 2025
2 parents 6da1e93 + 09fb68b commit 41d90c8
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 38 deletions.
1 change: 1 addition & 0 deletions examples/erlang/esp32/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ pack_runnable(sx127x sx127x eavmlib estdlib)
pack_runnable(reformat_nvs reformat_nvs eavmlib)
pack_runnable(uartecho uartecho eavmlib estdlib)
pack_runnable(ledc_example ledc_example eavmlib estdlib)
pack_runnable(epmd_disterl epmd_disterl eavmlib estdlib)
56 changes: 56 additions & 0 deletions examples/erlang/esp32/epmd_disterl.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
%
% This file is part of AtomVM.
%
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

-module(epmd_disterl).

-export([start/0]).

start() ->
Creds = [
{ssid, "myssid"},
{psk, "mypsk"}
],
case network:wait_for_sta(Creds, 30000) of
{ok, {Address, _Netmask, _Gateway}} ->
distribution_start(Address);
Error ->
io:format("An error occurred starting network: ~p~n", [Error])
end.

distribution_start(Address) ->
{ok, _EPMDPid} = epmd:start_link([]),
{ok, _KernelPid} = kernel:start(normal, []),
{X, Y, Z, T} = Address,
Node = list_to_atom(lists:flatten(io_lib:format("atomvm@~B.~B.~B.~B", [X, Y, Z, T]))),
{ok, _NetKernelPid} = net_kernel:start(Node, #{name_domain => longnames}),
io:format("Distribution was started\n"),
io:format("Node is ~p\n", [node()]),
net_kernel:set_cookie(<<"AtomVM">>),
io:format("Cookie is ~s\n", [net_kernel:get_cookie()]),
register(disterl, self()),
io:format(
"This AtomVM node is waiting for 'quit' message, and this process is registered as 'disterl'\n"
),
io:format("On an OTP node with long names distribution, run:\n"),
io:format("erlang:set_cookie('~s', 'AtomVM').\n", [Node]),
io:format("{disterl, '~s'} ! quit.\n", [Node]),
receive
quit -> ok
end.
1 change: 1 addition & 0 deletions libs/estdlib/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(ERLANG_MODULES
crypto
dist_util
erl_epmd
erpc
erts_debug
ets
gen_event
Expand Down
59 changes: 59 additions & 0 deletions libs/estdlib/src/erpc.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
%
% This file is part of AtomVM.
%
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

%%-----------------------------------------------------------------------------
%% @doc An implementation of the Erlang/OTP erpc interface.
%%
%% This module implements a strict subset of the Erlang/OTP erpc
%% interface.
%% @end
%%-----------------------------------------------------------------------------
-module(erpc).

% api
-export([
execute_call/4
]).

%%-----------------------------------------------------------------------------
%% @param Reference reference of the request, passed in exit tuple
%% @param Module module to call
%% @param Func function to call
%% @param Args argument of the call
%% @doc Execute a call locally, exiting with the result.
%% This function is called from rpc on other nodes using spawn_request BIF.
%% @end
%%-----------------------------------------------------------------------------
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
no_return().
execute_call(Reference, Module, Func, Args) ->
Reply =
try
Result = apply(Module, Func, Args),
{Reference, return, Result}
catch
throw:Reason ->
{Reference, throw, Reason};
exit:Reason ->
{Reference, exit, Reason};
error:Reason:Stack ->
{Reference, error, Reason, Stack}
end,
exit(Reply).
1 change: 1 addition & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ X(INET_ATOM, "\x4", "inet")
X(TIMEOUT_ATOM, "\x7", "timeout")

X(DIST_DATA_ATOM, "\x9", "dist_data")
X(REQUEST_ATOM, "\x7", "request")
142 changes: 107 additions & 35 deletions src/libAtomVM/dist_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ enum
OPERATION_ALIAS_SEND_TT = 34,
};

enum
{
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
};

struct DistributionPacket
{
struct ListHead head;
Expand Down Expand Up @@ -129,7 +135,7 @@ static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj)

static void dist_enqueue_message(term control_message, term payload, struct DistConnection *connection, GlobalContext *global)
{
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 is not smart enough
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 are not smart enough
enum ExternalTermResult serialize_result = externalterm_compute_external_size(control_message, &control_message_size, global);
if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) {
size_t payload_size = 0;
Expand Down Expand Up @@ -195,10 +201,7 @@ static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pi

struct DistConnection *conn_obj = (struct DistConnection *) obj;

if (UNLIKELY(enif_compare_monitors(&conn_obj->connection_process_monitor, mon) == 0)) {
struct RefcBinary *rsrc_refc = refc_binary_from_data(obj);
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
} else {
if (enif_compare_monitors(&conn_obj->connection_process_monitor, mon) != 0) {
struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors);
struct ListHead *item;
LIST_FOR_EACH (item, remote_monitors) {
Expand Down Expand Up @@ -280,10 +283,6 @@ static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[])
list_prepend(dist_connections, &conn_obj->head);
synclist_unlock(&ctx->global->dist_connections);

// Increment reference count as the resource should be alive until controller process dies
struct RefcBinary *rsrc_refc = refc_binary_from_data(conn_obj);
refc_binary_increment_refcount(rsrc_refc);

if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
Expand Down Expand Up @@ -339,6 +338,38 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
return result;
}

term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
{
if (term_is_atom(target_proc)) {
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
}
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
return OK_ATOM;
}

static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
Expand Down Expand Up @@ -390,32 +421,8 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
term from_pid = term_get_tuple_element(control, 1);
term target_proc = term_get_tuple_element(control, 2);
term monitor_ref = term_get_tuple_element(control, 3);
if (term_is_atom(target_proc)) {
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
}
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
if (UNLIKELY(term_is_invalid_term(dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx)))) {
return term_invalid_term();
}

break;
Expand Down Expand Up @@ -443,6 +450,54 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
synclist_unlock(&conn_obj->remote_monitors);
break;
}
case OPERATION_SPAWN_REQUEST: {
if (UNLIKELY(arity != 6)) {
RAISE_ERROR(BADARG_ATOM);
}
term roots[4];
roots[0] = argv[0];
roots[1] = argv[1];
roots[2] = control;
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
control = roots[2];
term arglist = roots[3];
term mfa = term_get_tuple_element(control, 4);
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
RAISE_ERROR(BADARG_ATOM);
}
if (UNLIKELY(!term_is_list(arglist))) {
RAISE_ERROR(BADARG_ATOM);
}
term reqid = term_get_tuple_element(control, 1);
term from = term_get_tuple_element(control, 2);
if (UNLIKELY(!term_is_pid(from))) {
RAISE_ERROR(BADARG_ATOM);
}
// term groupleader = term_get_tuple_element(control, 3);
// TODO: handle groupleader which is an externalpid
term options = term_get_tuple_element(control, 5);

term request_tuple = term_alloc_tuple(4, &ctx->heap);
term_put_tuple_element(request_tuple, 0, roots[0]);
term_put_tuple_element(request_tuple, 1, reqid);
term_put_tuple_element(request_tuple, 2, from);
term_put_tuple_element(request_tuple, 3, options);
term request_opt = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
term_put_tuple_element(request_opt, 1, request_tuple);
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);

// reuse roots for args
roots[0] = term_get_tuple_element(mfa, 0);
roots[1] = term_get_tuple_element(mfa, 1);
roots[2] = arglist;
roots[3] = spawn_opts;
nif_erlang_spawn_opt(ctx, 4, roots);
break;
}
default:
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
RAISE_ERROR(BADARG_ATOM);
Expand All @@ -468,6 +523,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
synclist_unlock(&ctx->global->dist_connections);
}

void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
{
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
// allocate tuple
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
term control_message = term_alloc_tuple(5, &heap);
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
term_put_tuple_element(control_message, 1, req_id);
term_put_tuple_element(control_message, 2, to_pid);
term_put_tuple_element(control_message, 3, term_from_int(flags));
term_put_tuple_element(control_message, 4, result);

dist_enqueue_message(control_message, term_invalid_term(), connection, global);
END_WITH_STACK_HEAP(heap, global)
}

const struct Nif setnode_3_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_erlang_setnode_3
Expand Down
27 changes: 27 additions & 0 deletions src/libAtomVM/dist_nifs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
extern const struct Nif dist_ctrl_get_data_nif;
extern const struct Nif dist_ctrl_put_data_nif;

struct DistConnection;

void dist_send_message(term external_pid, term payload, Context *ctx);

/**
* @doc Setup a monitor on a local process for a distributed process.
* @end
* @param conn_obj object of the connection
* @param from_pid remote pid setting up the monitor
* @param target_proc atom (for registered process) or pid of the local
* process to monitor
* @param monitor_ref reference used for monitor
* @param ctx context for memory allocation
*/
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);

/**
* @doc Send a spawn reply signal to a node
* @end
* @param conn_obj object of the connection
* @param req_id reference identifying the request
* @param to_pid (remote) process id identifying the caller
* @param link if a link was created
* @param monitor if a monitor was created
* @param result pid of the spawned process or atom for an error
* @param ctx context for memory allocation
*/
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 41d90c8

Please sign in to comment.