Skip to content

Commit

Permalink
Add support for asynchronous socket:accept/2 API
Browse files Browse the repository at this point in the history
- Also move reference count logic related to monitor to resources.c
- Also remove unnecessary stop handler for BSD sockets as demonitor is
  handled elsewhere, and stop handler can be called from a platform
  select loop, creating possible race conditions.

Signed-off-by: Paul Guyot <pguyot@kallisys.net>
  • Loading branch information
pguyot committed Jan 19, 2025
1 parent 095e61a commit a6add4b
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 98 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
- Added `erl_epmd` client implementation to epmd using `socket` module
- Added support for socket asynchronous API for `recv` and `recvfrom`.
- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`.

### Changed

Expand Down
53 changes: 42 additions & 11 deletions libs/estdlib/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,46 +229,77 @@ accept(Socket) ->
%% be set to listen for connections.
%%
%% Note that this function will block until a connection is made
%% from a client. Typically, users will spawn a call to `accept'
%% in a separate process.
%% from a client, unless `nowait' or a reference is passed as `Timeout'.
%% Typically, users will spawn a call to `accept' in a separate process.
%%
%% Example:
%%
%% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)'
%% @end
%%-----------------------------------------------------------------------------
-spec accept(Socket :: socket(), Timeout :: timeout()) ->
{ok, Connection :: socket()} | {error, Reason :: term()}.
-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) ->
{ok, Connection :: socket()}
| {select, {select_info, accept, reference()}}
| {error, Reason :: term()}.
accept(Socket, 0) ->
accept0_noselect(Socket);
accept(Socket, nowait) ->
accept0_nowait(Socket, erlang:make_ref());
accept(Socket, Ref) when is_reference(Ref) ->
accept0_nowait(Socket, Ref);
accept(Socket, Timeout) ->
accept0(Socket, Timeout).

accept0_noselect(Socket) ->
case ?MODULE:nif_accept(Socket) of
{error, _} = E ->
E;
{ok, _Socket} = Reply ->
Reply
end.

accept0(Socket, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
R ->
R
{ok, _Socket} = Reply ->
Reply
end;
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
% (b) flush of messages as we can have both in the
% queue
{error, closed};
Other ->
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
{error, closed}
after Timeout ->
{error, timeout}
end;
{error, _Reason} = Error ->
Error
end.

accept0_nowait(Socket, Ref) ->
case ?MODULE:nif_accept(Socket) of
{error, eagain} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, accept, Ref}};
{error, _} = SelectError ->
SelectError
end;
{error, _} = RecvError ->
RecvError;
{ok, _Socket} = Reply ->
Reply
end.

%%-----------------------------------------------------------------------------
%% @equiv socket:recv(Socket, 0)
%% @end
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ void context_destroy(Context *ctx)
void *resource = term_to_term_ptr(monitor->monitor_obj);
struct RefcBinary *refc = refc_binary_from_data(resource);
refc->resource_type->down(erl_nif_env_from_context(ctx), resource, &ctx->process_id, &monitor->ref_ticks);
refc_binary_decrement_refcount(refc, ctx->global);
free(monitor);
}
}
Expand Down
68 changes: 20 additions & 48 deletions src/libAtomVM/otp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,26 +280,6 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj)
#endif
}

#if OTP_SOCKET_BSD
static void socket_stop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call)
{
UNUSED(caller_env);
UNUSED(event);
UNUSED(is_direct_call);

struct SocketResource *rsrc_obj = (struct SocketResource *) obj;

if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
enif_demonitor_process(caller_env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
}

TRACE("socket_stop called on fd=%i\n", rsrc_obj->fd);
}
#endif

static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon)
{
UNUSED(caller_env);
Expand All @@ -314,24 +294,20 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
TRACE("socket_down called on process_id=%i\n", (int) *pid);
#endif

struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);

if (rsrc_obj->selecting_process_id == INVALID_PROCESS_ID) {
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return;
}
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;

#if OTP_SOCKET_BSD
// Monitor fired, so make sure we don't try to demonitor in select_stop
// as it could crash trying to reacquire lock on process table
// enif_select can decrement ref count but it's at least 2 in this case (1 for monitor and 1 for select)
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
// enif_select can decrement ref count but it's at least 2 here (1 for monitor and 1 for select)
enif_select(caller_env, rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
#elif OTP_SOCKET_LWIP
// Monitor can be called when we're selecting, accepting or connecting.
LWIP_BEGIN();
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
if (rsrc_obj->socket_state & SocketStateTCP) {
if (rsrc_obj->socket_state & SocketStateTCPListening) {
(void) tcp_close(rsrc_obj->tcp_pcb);
Expand All @@ -347,21 +323,13 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
}
LWIP_END();
#endif

SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);

// We're no longer monitoring so we can decrement ref count
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
}

static const ErlNifResourceTypeInit SocketResourceTypeInit = {
.members = 3,
.dtor = socket_dtor,
#if OTP_SOCKET_BSD
.stop = socket_stop,
#else
.stop = NULL,
#endif
.down = socket_down,
};

Expand Down Expand Up @@ -734,8 +702,11 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])

// So we handle closing a socket while another process is selecting
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
// Save process id as socket_stop may be called by enif_select.
int32_t selecting_process_id = rsrc_obj->selecting_process_id;
// Another process is selecting, therefore ref_count >= 3
// 1. this caller's context heap (parameter to close)
// 2. select
// 3. monitor

// Stop selecting.
int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
if (UNLIKELY(stop_res < 0)) {
Expand All @@ -749,13 +720,19 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
// When using asynchronous API, the selecting process can be the
// calling process. In this case we don't send any notification.
//
if (selecting_process_id != ctx->process_id) {
if (rsrc_obj->selecting_process_id != ctx->process_id) {
// send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid
if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) {
if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) {
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
}

// Stop monitor
enif_demonitor_process(erl_nif_env_from_context(ctx), rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;

// Now, ref_count >= 1 only.
}

// Eventually close the socket
Expand Down Expand Up @@ -1000,16 +977,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}

struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);

ErlNifEnv *env = erl_nif_env_from_context(ctx);
if (rsrc_obj->selecting_process_id != ctx->process_id && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
// demonitor can fail if process is gone.
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
// decrement ref count as we are demonitoring
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
}
// Monitor first as select is less likely to fail and it's less expensive to demonitor
// if select fails than to stop select if monitor fails
Expand All @@ -1018,8 +992,6 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(NOPROC_ATOM);
}
// increment ref count so the resource doesn't go away until monitor is fired
refc_binary_increment_refcount(rsrc_refc);
rsrc_obj->selecting_process_id = ctx->process_id;
}

Expand All @@ -1042,7 +1014,6 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
RAISE_ERROR(BADARG_ATOM);
}
}
Expand Down Expand Up @@ -1087,9 +1058,9 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
break;
default:
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
LWIP_END();
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
RAISE_ERROR(BADARG_ATOM);
}
LWIP_END();
Expand All @@ -1115,11 +1086,10 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[])
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
enif_demonitor_process(erl_nif_env_from_context(ctx), rsrc_obj, &rsrc_obj->selecting_process_monitor);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
}
#if OTP_SOCKET_BSD
if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) {
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
#elif OTP_SOCKET_LWIP
Expand Down Expand Up @@ -1759,8 +1729,10 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen);
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) {
AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd);
int err = errno;
if (err != EAGAIN) {
AVM_LOGI(TAG, "Unable to accept on socket %i. errno=%i", rsrc_obj->fd, (int) err);
}
term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global);
return make_error_tuple(reason, ctx);
} else {
Expand Down
4 changes: 0 additions & 4 deletions src/libAtomVM/refc_binary.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ void refc_binary_destroy(struct RefcBinary *refc, struct GlobalContext *global)
UNUSED(global);

if (refc->resource_type) {
if (refc->resource_type->down) {
// There may be monitors associated with this resource.
destroy_resource_monitors(refc, global);
}
if (refc->resource_type->dtor) {
ErlNifEnv env;
erl_nif_env_partial_init_from_globalcontext(&env, global);
Expand Down
22 changes: 2 additions & 20 deletions src/libAtomVM/resources.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ int enif_monitor_process(ErlNifEnv *env, void *obj, const ErlNifPid *target_pid,

struct ResourceMonitor *monitor = context_resource_monitor(target, obj);
list_append(&resource->resource_type->monitors, &monitor->resource_list_head);
refc_binary_increment_refcount(resource);
globalcontext_get_process_unlock(env->global, target);

if (mon) {
Expand Down Expand Up @@ -405,6 +406,7 @@ int enif_demonitor_process(ErlNifEnv *env, void *obj, const ErlNifMonitor *mon)
list_remove(&monitor->resource_list_head);
list_remove(&monitor->base.monitor_list_head);
free(monitor);
refc_binary_decrement_refcount(resource, global);
synclist_unlock(&global->processes_table);
return 0;
}
Expand All @@ -415,26 +417,6 @@ int enif_demonitor_process(ErlNifEnv *env, void *obj, const ErlNifMonitor *mon)
return -1;
}

void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global)
{
struct ListHead *processes_table_list = synclist_wrlock(&global->processes_table);
UNUSED(processes_table_list);
term monitor_obj = ((term) resource->data) | TERM_BOXED_VALUE_TAG;

struct ListHead *item;
struct ListHead *tmp;
MUTABLE_LIST_FOR_EACH (item, tmp, &resource->resource_type->monitors) {
struct ResourceMonitor *monitor = GET_LIST_ENTRY(item, struct ResourceMonitor, resource_list_head);
if (monitor->base.monitor_obj == monitor_obj) {
list_remove(&monitor->resource_list_head);
list_remove(&monitor->base.monitor_list_head);
free(monitor);
}
}

synclist_unlock(&global->processes_table);
}

int enif_compare_monitors(const ErlNifMonitor *monitor1, const ErlNifMonitor *monitor2)
{
uint64_t ref_ticks1 = *monitor1;
Expand Down
8 changes: 0 additions & 8 deletions src/libAtomVM/resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,6 @@ bool select_event_notify(ErlNifEvent event, bool is_read, bool is_write, GlobalC
*/
void select_event_count_and_destroy_closed(struct ListHead *select_events, size_t *read, size_t *write, size_t *either, GlobalContext *global);

/**
* @brief Destroy monitors associated with a resource.
*
* @param resource resource to destroy monitors for
* @param global the global context
*/
void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global);

#define SELECT_EVENT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE)

/**
Expand Down
Loading

0 comments on commit a6add4b

Please sign in to comment.