Skip to content

Commit

Permalink
Add support for external pid as group leader
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
  • Loading branch information
pguyot committed Jan 21, 2025
1 parent 47bbff1 commit f735720
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 47 deletions.
11 changes: 11 additions & 0 deletions src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
return true;
}

bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal)
{
size_t leader_term_size = memory_estimate_usage(signal->signal_term);
ctx->group_leader = UNDEFINED_ATOM;
if (UNLIKELY(memory_ensure_free_opt(ctx, leader_term_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
return false;
}
ctx->group_leader = memory_copy_term_tree(&ctx->heap, signal->signal_term);
return true;
}

void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info)
{
context_update_flags(ctx, ~Trap, NoFlags);
Expand Down
9 changes: 9 additions & 0 deletions src/libAtomVM/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,15 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
*/
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info);

/**
* @brief Process set group leader signal
*
* @param ctx the context being executed
* @param signal the message with the group leader term
* @return \c true if successful, \c false in case of memory error
*/
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal);

/**
* @brief Get process information.
*
Expand Down
15 changes: 9 additions & 6 deletions src/libAtomVM/dist_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,10 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
}
term roots[4];
roots[0] = argv[0];
roots[1] = argv[1];
roots[1] = argv[1]; // dist handle, ensure it's not garbage collected until we return
roots[2] = control;
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(5)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
control = roots[2];
Expand All @@ -516,15 +516,18 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_is_pid(from))) {
RAISE_ERROR(BADARG_ATOM);
}
// term groupleader = term_get_tuple_element(control, 3);
// TODO: handle groupleader which is an externalpid
term groupleader = term_get_tuple_element(control, 3);
if (UNLIKELY(!term_is_pid(groupleader))) {
RAISE_ERROR(BADARG_ATOM);
}
term options = term_get_tuple_element(control, 5);

term request_tuple = term_alloc_tuple(4, &ctx->heap);
term request_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(request_tuple, 0, roots[0]);
term_put_tuple_element(request_tuple, 1, reqid);
term_put_tuple_element(request_tuple, 2, from);
term_put_tuple_element(request_tuple, 3, options);
term_put_tuple_element(request_tuple, 3, groupleader);
term_put_tuple_element(request_tuple, 4, options);
term request_opt = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
term_put_tuple_element(request_opt, 1, request_tuple);
Expand Down
3 changes: 2 additions & 1 deletion src/libAtomVM/mailbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap)
break;
}
case KillSignal:
case TrapAnswerSignal: {
case TrapAnswerSignal:
case SetGroupLeaderSignal: {
struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base);
term mso_list = term_signal->storage[STORAGE_MSO_LIST_INDEX];
HeapFragment *fragment = mailbox_message_to_heap_fragment(term_signal, term_signal->heap_end);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ enum MessageType
TrapExceptionSignal,
FlushMonitorSignal,
FlushInfoMonitorSignal,
SetGroupLeaderSignal,
};

struct MailboxMessage
Expand Down
5 changes: 5 additions & 0 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ static enum MemoryGCResult memory_gc(Context *ctx, size_t new_size, size_t num_r
TRACE("- Running copy GC on exit reason\n");
ctx->exit_reason = memory_shallow_copy_term(old_root_fragment, ctx->exit_reason, &ctx->heap.heap_ptr, true);

TRACE("- Running copy GC on group leader\n");
ctx->group_leader = memory_shallow_copy_term(old_root_fragment, ctx->group_leader, &ctx->heap.heap_ptr, true);

TRACE("- Running copy GC on provided roots\n");
for (size_t i = 0; i < num_roots; i++) {
roots[i] = memory_shallow_copy_term(old_root_fragment, roots[i], &ctx->heap.heap_ptr, 1);
Expand Down Expand Up @@ -373,6 +376,8 @@ static enum MemoryGCResult memory_shrink(Context *ctx, size_t new_size, size_t n
}
// ...exit_reason
memory_scan_and_rewrite(1, &ctx->exit_reason, old_heap_root, old_end, delta, true);
// ...group_leader
memory_scan_and_rewrite(1, &ctx->group_leader, old_heap_root, old_end, delta, true);
// ...and MSO list.
term *mso_ptr = &ctx->heap.root->mso_list;
while (!term_is_nil(*mso_ptr)) {
Expand Down
61 changes: 36 additions & 25 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1211,14 +1211,21 @@ static NativeHandlerResult process_console_mailbox(Context *ctx)

// Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4
// opts_term is [] for spawn/1,3
static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term)
{
term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM);
term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM);
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
term group_leader;

if (UNLIKELY(request_term != term_nil())) {
group_leader = term_get_tuple_element(request_term, 3);
} else {
group_leader = ctx->group_leader;
}

if (min_heap_size_term != term_nil()) {
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
Expand All @@ -1245,6 +1252,21 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
}
}

int size = 0;
for (uint32_t i = 0; i < n_freeze; i++) {
size += memory_estimate_usage(new_ctx->x[i + arity - n_freeze]);
}
size += memory_estimate_usage(group_leader);
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
//TODO: new process should be terminated, however a new pid is returned anyway
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
AVM_ABORT();
}
new_ctx->group_leader = memory_copy_term_tree(&new_ctx->heap, group_leader);
for (uint32_t i = 0; i < arity; i++) {
new_ctx->x[i] = memory_copy_term_tree(&new_ctx->heap, new_ctx->x[i]);
}

switch (heap_growth_strategy) {
case BOUNDED_FREE_ATOM:
new_ctx->heap_growth_strategy = BoundedFreeHeapGrowth;
Expand Down Expand Up @@ -1308,7 +1330,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
term dhandle = term_get_tuple_element(request_term, 0);
term request_ref = term_get_tuple_element(request_term, 1);
term request_from = term_get_tuple_element(request_term, 2);
term request_opts = term_get_tuple_element(request_term, 3);
term request_opts = term_get_tuple_element(request_term, 4);
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
// TODO handle link with external nodes
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
Expand Down Expand Up @@ -1344,7 +1366,6 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
VALIDATE_VALUE(opts_term, term_is_list);

Context *new_ctx = context_new(ctx->global);
new_ctx->group_leader = ctx->group_leader;

const term *boxed_value = term_to_const_term_ptr(fun_term);

Expand All @@ -1365,24 +1386,15 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])

// TODO: new process should fail with badarity if arity != 0

int size = 0;
for (uint32_t i = 0; i < n_freeze; i++) {
size += memory_estimate_usage(boxed_value[i + 3]);
}
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
//TODO: new process should be terminated, however a new pid is returned anyway
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
AVM_ABORT();
}
for (uint32_t i = 0; i < n_freeze; i++) {
new_ctx->x[i + arity - n_freeze] = memory_copy_term_tree(&new_ctx->heap, boxed_value[i + 3]);
new_ctx->x[i + arity - n_freeze] = boxed_value[i + 3];
}

new_ctx->saved_module = fun_module;
new_ctx->saved_ip = fun_module->labels[label];
new_ctx->cp = module_address(fun_module->module_index, fun_module->end_instruction_ii);

return do_spawn(ctx, new_ctx, opts_term);
return do_spawn(ctx, new_ctx, arity, n_freeze, opts_term);
}

term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
Expand All @@ -1399,7 +1411,6 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
VALIDATE_VALUE(opts_term, term_is_list);

Context *new_ctx = context_new(ctx->global);
new_ctx->group_leader = ctx->group_leader;

AtomString module_string = globalcontext_atomstring_from_term(ctx->global, argv[0]);
AtomString function_string = globalcontext_atomstring_from_term(ctx->global, argv[1]);
Expand Down Expand Up @@ -1439,14 +1450,8 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
new_ctx->min_heap_size = min_heap_size;
}

avm_int_t size = memory_estimate_usage(args_term);
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
// Context was not scheduled yet, we can destroy it.
context_destroy(new_ctx);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
while (term_is_nonempty_list(args_term)) {
new_ctx->x[reg_index] = memory_copy_term_tree(&new_ctx->heap, term_get_list_head(args_term));
new_ctx->x[reg_index] = term_get_list_head(args_term);
reg_index++;

args_term = term_get_list_tail(args_term);
Expand All @@ -1456,7 +1461,7 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
}
}

return do_spawn(ctx, new_ctx, opts_term);
return do_spawn(ctx, new_ctx, reg_index, reg_index, opts_term);
}

static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
Expand Down Expand Up @@ -4003,15 +4008,21 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
term leader = argv[0];
term pid = argv[1];
VALIDATE_VALUE(pid, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_pid);

int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
RAISE_ERROR(BADARG_ATOM);
}

target->group_leader = leader;
if (term_is_local_pid(leader)) {
// We cannot put leader term on the heap
mailbox_send_term_signal(target, SetGroupLeaderSignal, leader);
} else {
target->group_leader = leader;
}

globalcontext_get_process_unlock(ctx->global, target);
return TRUE_ATOM;
}
Expand Down
9 changes: 9 additions & 0 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,15 @@ static void destroy_extended_registers(Context *ctx, unsigned int live)
context_process_flush_monitor_signal(ctx, flush_signal->ref_ticks, info); \
break; \
} \
case SetGroupLeaderSignal: { \
struct TermSignal *group_leader \
= CONTAINER_OF(signal_message, struct TermSignal, base); \
if (UNLIKELY(!context_process_signal_set_group_leader(ctx, group_leader))) { \
SET_ERROR(OUT_OF_MEMORY_ATOM); \
next_label = &&handle_error; \
} \
break; \
} \
case NormalMessage: { \
UNREACHABLE(); \
} \
Expand Down
107 changes: 92 additions & 15 deletions tests/libs/estdlib/test_net_kernel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ test() ->
ok = test_rpc_loop_from_beam(Platform),
ok = test_autoconnect_fail(Platform),
ok = test_autoconnect_to_beam(Platform),
ok = test_groupleader(Platform),
ok;
false ->
io:format("~s: skipped\n", [?MODULE]),
Expand Down Expand Up @@ -156,21 +157,24 @@ test_autoconnect_to_beam(Platform) ->
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
Node = node(),
erlang:set_cookie(Node, 'AtomVM'),
spawn_link(fun() ->
execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"register(beam, self()),"
"F = fun(G) ->"
" receive"
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> timeout"
" end "
"end, "
"F(F).\" -s init stop -noshell"
)
end),
{Pid, MonitorRef} = spawn_opt(
fun() ->
[] = execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"register(beam, self()),"
"F = fun(G) ->"
" receive"
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> exit(timeout)"
" end "
"end, "
"F(F).\" -s init stop -noshell"
)
end,
[link, monitor]
),
% Wait sufficiently for beam to be up, without connecting to it since
% that's part of the test
timer:sleep(1000),
Expand Down Expand Up @@ -200,6 +204,79 @@ test_autoconnect_to_beam(Platform) ->
{OTPPid, quit} -> ok
after 5000 -> timeout
end,
normal =
receive
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
after 5000 -> timeout
end,
net_kernel:stop(),
ok.

test_groupleader(Platform) ->
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
Node = node(),
erlang:set_cookie(Node, 'AtomVM'),
register(atomvm, self()),
Parent = self(),
{Pid, MonitorRef} = spawn_opt(
fun() ->
Result = execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"{atomvm, '" ++ atom_to_list(Node) ++
"'} ! {beam, self()}, "
"F = fun(G) ->"
" receive"
" {Caller, apply, M, F, A} -> Result = apply(M, F, A), Caller ! {self(), Result}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> exit(timeout)"
" end "
"end, "
"F(F).\" -s init stop -noshell"
),
Parent ! {io_result, Result}
end,
[link, monitor]
),
BeamMainPid =
receive
{beam, BeamMainPid0} ->
BeamMainPid0;
{io_result, Result0} ->
io:format("~s\n", [Result0]),
exit(timeout)
after 5000 -> exit(timeout)
end,
BeamMainPid ! {self(), apply, rpc, call, [Node, io, format, ["hello group leader"]]},
ok =
receive
{BeamMainPid, Result} ->
Result;
{io_result, Result1} ->
io:format("~s\n", [Result1]),
exit(timeout)
after 5000 -> exit(timeout)
end,
BeamMainPid ! {self(), quit},
ok =
receive
{BeamMainPid, quit} ->
ok;
{io_result, Result2} ->
io:format("~s\n", [Result2]),
exit(timeout)
after 5000 -> timeout
end,
"hello group leader" =
receive
{io_result, IOResult} -> IOResult
after 5000 -> timeout
end,
normal =
receive
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
after 5000 -> timeout
end,
net_kernel:stop(),
ok.

Expand Down

0 comments on commit f735720

Please sign in to comment.