diff --git a/src/libAtomVM/context.c b/src/libAtomVM/context.c index 4ab8cb0e7..c5d97d2bb 100644 --- a/src/libAtomVM/context.c +++ b/src/libAtomVM/context.c @@ -217,6 +217,17 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal) return true; } +bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal) +{ + size_t leader_term_size = memory_estimate_usage(signal->signal_term); + ctx->group_leader = UNDEFINED_ATOM; + if (UNLIKELY(memory_ensure_free_opt(ctx, leader_term_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + return false; + } + ctx->group_leader = memory_copy_term_tree(&ctx->heap, signal->signal_term); + return true; +} + void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info) { context_update_flags(ctx, ~Trap, NoFlags); diff --git a/src/libAtomVM/context.h b/src/libAtomVM/context.h index 527a37fa5..09d69861c 100644 --- a/src/libAtomVM/context.h +++ b/src/libAtomVM/context.h @@ -393,6 +393,15 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal) */ void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info); +/** + * @brief Process set group leader signal + * + * @param ctx the context being executed + * @param signal the message with the group leader term + * @return \c true if successful, \c false in case of memory error + */ +bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal); + /** * @brief Get process information. * diff --git a/src/libAtomVM/dist_nifs.c b/src/libAtomVM/dist_nifs.c index b35800be7..18e41e99b 100644 --- a/src/libAtomVM/dist_nifs.c +++ b/src/libAtomVM/dist_nifs.c @@ -496,10 +496,10 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) } term roots[4]; roots[0] = argv[0]; - roots[1] = argv[1]; + roots[1] = argv[1]; // dist handle, ensure it's not garbage collected until we return roots[2] = control; roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots); - if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(5)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } control = roots[2]; @@ -516,15 +516,18 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_is_pid(from))) { RAISE_ERROR(BADARG_ATOM); } - // term groupleader = term_get_tuple_element(control, 3); - // TODO: handle groupleader which is an externalpid + term groupleader = term_get_tuple_element(control, 3); + if (UNLIKELY(!term_is_pid(groupleader))) { + RAISE_ERROR(BADARG_ATOM); + } term options = term_get_tuple_element(control, 5); - term request_tuple = term_alloc_tuple(4, &ctx->heap); + term request_tuple = term_alloc_tuple(5, &ctx->heap); term_put_tuple_element(request_tuple, 0, roots[0]); term_put_tuple_element(request_tuple, 1, reqid); term_put_tuple_element(request_tuple, 2, from); - term_put_tuple_element(request_tuple, 3, options); + term_put_tuple_element(request_tuple, 3, groupleader); + term_put_tuple_element(request_tuple, 4, options); term request_opt = term_alloc_tuple(2, &ctx->heap); term_put_tuple_element(request_opt, 0, REQUEST_ATOM); term_put_tuple_element(request_opt, 1, request_tuple); diff --git a/src/libAtomVM/mailbox.c b/src/libAtomVM/mailbox.c index bcd75a75b..9acac998d 100644 --- a/src/libAtomVM/mailbox.c +++ b/src/libAtomVM/mailbox.c @@ -96,7 +96,8 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap) break; } case KillSignal: - case TrapAnswerSignal: { + case TrapAnswerSignal: + case SetGroupLeaderSignal: { struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base); term mso_list = term_signal->storage[STORAGE_MSO_LIST_INDEX]; HeapFragment *fragment = mailbox_message_to_heap_fragment(term_signal, term_signal->heap_end); diff --git a/src/libAtomVM/mailbox.h b/src/libAtomVM/mailbox.h index 68fb92ae1..873bdf1b4 100644 --- a/src/libAtomVM/mailbox.h +++ b/src/libAtomVM/mailbox.h @@ -89,6 +89,7 @@ enum MessageType TrapExceptionSignal, FlushMonitorSignal, FlushInfoMonitorSignal, + SetGroupLeaderSignal, }; struct MailboxMessage diff --git a/src/libAtomVM/memory.c b/src/libAtomVM/memory.c index 183c22ac6..e50dc1bbb 100644 --- a/src/libAtomVM/memory.c +++ b/src/libAtomVM/memory.c @@ -300,6 +300,9 @@ static enum MemoryGCResult memory_gc(Context *ctx, size_t new_size, size_t num_r TRACE("- Running copy GC on exit reason\n"); ctx->exit_reason = memory_shallow_copy_term(old_root_fragment, ctx->exit_reason, &ctx->heap.heap_ptr, true); + TRACE("- Running copy GC on group leader\n"); + ctx->group_leader = memory_shallow_copy_term(old_root_fragment, ctx->group_leader, &ctx->heap.heap_ptr, true); + TRACE("- Running copy GC on provided roots\n"); for (size_t i = 0; i < num_roots; i++) { roots[i] = memory_shallow_copy_term(old_root_fragment, roots[i], &ctx->heap.heap_ptr, 1); @@ -373,6 +376,8 @@ static enum MemoryGCResult memory_shrink(Context *ctx, size_t new_size, size_t n } // ...exit_reason memory_scan_and_rewrite(1, &ctx->exit_reason, old_heap_root, old_end, delta, true); + // ...group_leader + memory_scan_and_rewrite(1, &ctx->group_leader, old_heap_root, old_end, delta, true); // ...and MSO list. term *mso_ptr = &ctx->heap.root->mso_list; while (!term_is_nil(*mso_ptr)) { diff --git a/src/libAtomVM/nifs.c b/src/libAtomVM/nifs.c index 1985de22b..c4df95731 100644 --- a/src/libAtomVM/nifs.c +++ b/src/libAtomVM/nifs.c @@ -1211,7 +1211,7 @@ static NativeHandlerResult process_console_mailbox(Context *ctx) // Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4 // opts_term is [] for spawn/1,3 -static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) +static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term) { term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM); term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM); @@ -1219,6 +1219,13 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM); term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM); term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM); + term group_leader; + + if (UNLIKELY(request_term != term_nil())) { + group_leader = term_get_tuple_element(request_term, 3); + } else { + group_leader = ctx->group_leader; + } if (min_heap_size_term != term_nil()) { if (UNLIKELY(!term_is_integer(min_heap_size_term))) { @@ -1245,6 +1252,21 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) } } + int size = 0; + for (uint32_t i = 0; i < n_freeze; i++) { + size += memory_estimate_usage(new_ctx->x[i + arity - n_freeze]); + } + size += memory_estimate_usage(group_leader); + if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + //TODO: new process should be terminated, however a new pid is returned anyway + fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n"); + AVM_ABORT(); + } + new_ctx->group_leader = memory_copy_term_tree(&new_ctx->heap, group_leader); + for (uint32_t i = 0; i < arity; i++) { + new_ctx->x[i] = memory_copy_term_tree(&new_ctx->heap, new_ctx->x[i]); + } + switch (heap_growth_strategy) { case BOUNDED_FREE_ATOM: new_ctx->heap_growth_strategy = BoundedFreeHeapGrowth; @@ -1308,7 +1330,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) term dhandle = term_get_tuple_element(request_term, 0); term request_ref = term_get_tuple_element(request_term, 1); term request_from = term_get_tuple_element(request_term, 2); - term request_opts = term_get_tuple_element(request_term, 3); + term request_opts = term_get_tuple_element(request_term, 4); monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM); // TODO handle link with external nodes // link_term = interop_proplist_get_value(request_opts, LINK_ATOM); @@ -1344,7 +1366,6 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(opts_term, term_is_list); Context *new_ctx = context_new(ctx->global); - new_ctx->group_leader = ctx->group_leader; const term *boxed_value = term_to_const_term_ptr(fun_term); @@ -1365,24 +1386,15 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]) // TODO: new process should fail with badarity if arity != 0 - int size = 0; for (uint32_t i = 0; i < n_freeze; i++) { - size += memory_estimate_usage(boxed_value[i + 3]); - } - if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { - //TODO: new process should be terminated, however a new pid is returned anyway - fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n"); - AVM_ABORT(); - } - for (uint32_t i = 0; i < n_freeze; i++) { - new_ctx->x[i + arity - n_freeze] = memory_copy_term_tree(&new_ctx->heap, boxed_value[i + 3]); + new_ctx->x[i + arity - n_freeze] = boxed_value[i + 3]; } new_ctx->saved_module = fun_module; new_ctx->saved_ip = fun_module->labels[label]; new_ctx->cp = module_address(fun_module->module_index, fun_module->end_instruction_ii); - return do_spawn(ctx, new_ctx, opts_term); + return do_spawn(ctx, new_ctx, arity, n_freeze, opts_term); } term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) @@ -1399,7 +1411,6 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(opts_term, term_is_list); Context *new_ctx = context_new(ctx->global); - new_ctx->group_leader = ctx->group_leader; AtomString module_string = globalcontext_atomstring_from_term(ctx->global, argv[0]); AtomString function_string = globalcontext_atomstring_from_term(ctx->global, argv[1]); @@ -1439,14 +1450,8 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) new_ctx->min_heap_size = min_heap_size; } - avm_int_t size = memory_estimate_usage(args_term); - if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { - // Context was not scheduled yet, we can destroy it. - context_destroy(new_ctx); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } while (term_is_nonempty_list(args_term)) { - new_ctx->x[reg_index] = memory_copy_term_tree(&new_ctx->heap, term_get_list_head(args_term)); + new_ctx->x[reg_index] = term_get_list_head(args_term); reg_index++; args_term = term_get_list_tail(args_term); @@ -1456,7 +1461,7 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) } } - return do_spawn(ctx, new_ctx, opts_term); + return do_spawn(ctx, new_ctx, reg_index, reg_index, opts_term); } static term nif_erlang_send_2(Context *ctx, int argc, term argv[]) @@ -4003,7 +4008,7 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[]) term leader = argv[0]; term pid = argv[1]; VALIDATE_VALUE(pid, term_is_local_pid); - VALIDATE_VALUE(leader, term_is_local_pid); + VALIDATE_VALUE(leader, term_is_pid); int local_process_id = term_to_local_process_id(pid); Context *target = globalcontext_get_process_lock(ctx->global, local_process_id); @@ -4011,7 +4016,13 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } - target->group_leader = leader; + if (term_is_local_pid(leader)) { + // We cannot put leader term on the heap + mailbox_send_term_signal(target, SetGroupLeaderSignal, leader); + } else { + target->group_leader = leader; + } + globalcontext_get_process_unlock(ctx->global, target); return TRUE_ATOM; } diff --git a/src/libAtomVM/opcodesswitch.h b/src/libAtomVM/opcodesswitch.h index 1e771e3c4..cb16e0b92 100644 --- a/src/libAtomVM/opcodesswitch.h +++ b/src/libAtomVM/opcodesswitch.h @@ -1049,6 +1049,15 @@ static void destroy_extended_registers(Context *ctx, unsigned int live) context_process_flush_monitor_signal(ctx, flush_signal->ref_ticks, info); \ break; \ } \ + case SetGroupLeaderSignal: { \ + struct TermSignal *group_leader \ + = CONTAINER_OF(signal_message, struct TermSignal, base); \ + if (UNLIKELY(!context_process_signal_set_group_leader(ctx, group_leader))) { \ + SET_ERROR(OUT_OF_MEMORY_ATOM); \ + next_label = &&handle_error; \ + } \ + break; \ + } \ case NormalMessage: { \ UNREACHABLE(); \ } \ diff --git a/tests/libs/estdlib/test_net_kernel.erl b/tests/libs/estdlib/test_net_kernel.erl index e6a129ac3..fec8f0b9b 100644 --- a/tests/libs/estdlib/test_net_kernel.erl +++ b/tests/libs/estdlib/test_net_kernel.erl @@ -37,6 +37,7 @@ test() -> ok = test_rpc_loop_from_beam(Platform), ok = test_autoconnect_fail(Platform), ok = test_autoconnect_to_beam(Platform), + ok = test_groupleader(Platform), ok; false -> io:format("~s: skipped\n", [?MODULE]), @@ -203,6 +204,38 @@ test_autoconnect_to_beam(Platform) -> net_kernel:stop(), ok. +test_groupleader(Platform) -> + {ok, _NetKernelPid} = net_kernel_start(Platform, atomvm), + Node = node(), + erlang:set_cookie(Node, 'AtomVM'), + register(atomvm, self()), + Parent = self(), + spawn_link(fun() -> + Result = execute_command( + Platform, + "erl -sname otp -setcookie AtomVM -eval \"" + "{atomvm, '" ++ atom_to_list(Node) ++ "'} ! {beam, self()}, " + "F = fun(G) ->" + " receive" + " {Caller, apply, M, F, A} -> Result = apply(M, F, A), Caller ! {self(), Result}, G(G);" + " {Caller, quit} -> Caller ! {self(), quit}" + " after 5000 -> timeout" + " end " + "end, " + "F(F).\" -s init stop -noshell" + ), + Parent ! {io_result, Result} + end), + BeamMainPid = receive {beam, Pid} -> Pid after 5000 -> exit(timeout) end, + BeamMainPid ! {self(), apply, rpc, call, [Node, io, format, ["hello group leader"]]}, + ok = receive {BeamMainPid, Result} -> Result after 5000 -> exit(timeout) end, + BeamMainPid ! {self(), quit}, + ok = receive {BeamMainPid, quit} -> ok after 5000 -> timeout end, + "hello group leader" = receive {io_result, IOResult} -> IOResult after 5000 -> timeout end, + net_kernel:stop(), + ok. + + % On AtomVM, we need to start kernel. setup("BEAM") -> ok;