Skip to content

Commit

Permalink
Implement #214 JUICE_CONCURRENCY_MODE_USER
Browse files Browse the repository at this point in the history
  • Loading branch information
N7Alpha committed Nov 6, 2023
1 parent 5f753ca commit e8dac23
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(LIBJUICE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_poll.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_thread.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_mux.c
${CMAKE_CURRENT_SOURCE_DIR}/src/conn_user.c
${CMAKE_CURRENT_SOURCE_DIR}/src/base64.c
${CMAKE_CURRENT_SOURCE_DIR}/src/hash.c
${CMAKE_CURRENT_SOURCE_DIR}/src/hmac.c
Expand Down Expand Up @@ -74,6 +75,7 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/turn.c
${CMAKE_CURRENT_SOURCE_DIR}/test/thread.c
${CMAKE_CURRENT_SOURCE_DIR}/test/mux.c
${CMAKE_CURRENT_SOURCE_DIR}/test/user.c
${CMAKE_CURRENT_SOURCE_DIR}/test/notrickle.c
${CMAKE_CURRENT_SOURCE_DIR}/test/server.c
${CMAKE_CURRENT_SOURCE_DIR}/test/conflict.c
Expand Down
23 changes: 23 additions & 0 deletions include/juice/juice.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ typedef enum juice_concurrency_mode {
JUICE_CONCURRENCY_MODE_POLL = 0, // Connections share a single thread
JUICE_CONCURRENCY_MODE_MUX, // Connections are multiplexed on a single UDP socket
JUICE_CONCURRENCY_MODE_THREAD, // Each connection runs in its own thread

JUICE_CONCURRENCY_MODE_USER, // Agents must be updated via calls to juice_user_poll...
// Note:
// - ICE keepalive requires regular polling RFC 8445 11.
// - The OS's UDP packet buffering capacity is limited you need to make
// sure the inflow of packets will not cause a bottleneck/packet loss
// - Also DNS resolution blocks in this mode
} juice_concurrency_mode_t;

typedef struct juice_config {
Expand Down Expand Up @@ -116,6 +123,22 @@ JUICE_EXPORT int juice_get_selected_addresses(juice_agent_t *agent, char *local,
char *remote, size_t remote_size);
JUICE_EXPORT const char *juice_state_to_string(juice_state_t state);

// Valid for JUICE_CONCURRENCY_MODE_USER only
//
// Non-blocking tries to read a datagram from `agent`'s socket. Forwards Non-ICE packets to you via the `on_recv`.
// You shouldn't use data in `buffer` directly since it might be a STUN packet, contain TURN headers, etc;
// The intended use of passing `buffer` is to give finer control to the user e.g. zero-copy
//
// Parameters:
// - `agent`: A pointer to an `juice_agent_t` structure initialized with `juice_create_agent()`.
// - `buffer`: A pointer to the buffer where the received data will be stored It should accommodate
// the largest datagram you expect to receive (INCLUDING TURN HEADERS) otherwise, data will be truncated
//
// Returns:
// - Positive number: Indicates the number of bytes received (0 means no more datagrams)
// - Negative number: Indicates an error occurred
JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size);

// ICE server

typedef struct juice_server juice_server_t;
Expand Down
3 changes: 2 additions & 1 deletion src/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ int agent_gather_candidates(juice_agent_t *agent) {
conn_unlock(agent);
conn_interrupt(agent);

if (has_nonnumeric_server_hostnames(&agent->config)) {
if (has_nonnumeric_server_hostnames(&agent->config) &&
conn_mode_is_concurrent(agent->config.concurrency_mode)) {
// Resolve server hostnames in a separate thread as it may block
JLOG_DEBUG("Starting resolver thread for servers");
int ret = thread_init(&agent->resolver_thread, resolver_thread_entry, agent);
Expand Down
10 changes: 7 additions & 3 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "conn_mux.h"
#include "conn_poll.h"
#include "conn_thread.h"
#include "conn_user.h"
#include "log.h"

#include <assert.h>
Expand All @@ -33,10 +34,10 @@ typedef struct conn_mode_entry {
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);

mutex_t mutex;
conn_registry_t *registry;
conn_registry_t *registry; // left NULL for concurrency modes that don't use a global registry
} conn_mode_entry_t;

#define MODE_ENTRIES_SIZE 3
#define MODE_ENTRIES_SIZE 4

static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
Expand All @@ -46,7 +47,10 @@ static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_user_init, conn_user_cleanup, conn_user_lock, conn_user_unlock,
conn_user_interrupt, conn_user_send, conn_user_get_addrs, MUTEX_INITIALIZER, NULL},
};

static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
Expand Down
4 changes: 4 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data,
int ds);
int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size);

static bool conn_mode_is_concurrent(juice_concurrency_mode_t mode) {
return mode != JUICE_CONCURRENCY_MODE_USER;
}

#endif
188 changes: 188 additions & 0 deletions src/conn_user.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/**
* Copyright (c) 2023 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#include "conn_user.h"
#include "agent.h"
#include "log.h"
#include "socket.h"
#include "udp.h"

#include <stdint.h>

typedef enum conn_state { CONN_STATE_NEW = 0, CONN_STATE_READY, CONN_STATE_FINISHED } conn_state_t;

typedef struct conn_impl {
conn_state_t state;
socket_t sock;
mutex_t mutex;
mutex_t send_mutex;
int send_ds;
timestamp_t next_timestamp;
} conn_impl_t;

static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src);

JUICE_EXPORT int juice_user_poll(juice_agent_t *agent, char *buffer, size_t size) {
if (!agent || !buffer)
return JUICE_ERR_INVALID;

conn_impl_t *conn_impl = agent->conn_impl;

if (!conn_impl)
return JUICE_ERR_INVALID;

mutex_lock(&conn_impl->mutex);

if (conn_impl->state == CONN_STATE_FINISHED) {
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}

if (agent->config.concurrency_mode != JUICE_CONCURRENCY_MODE_USER) {
JLOG_ERROR("agent->config.concurrency_mode=%d Only JUICE_CONCURRENCY_MODE_USER (%d) is supported",
agent->config.concurrency_mode, JUICE_CONCURRENCY_MODE_USER);
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_INVALID;
}

addr_record_t src;
int ret = conn_user_recv(conn_impl->sock, buffer, size, &src);

if (ret < 0) {
agent_conn_fail(agent);
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
} else if (ret > 0) {
if (agent_conn_recv(agent, buffer, (size_t)ret, &src) != 0) {
JLOG_WARN("Agent receive failed");
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}
}

if ( ret > 0 // We just received a datagram
|| conn_impl->next_timestamp <= current_timestamp()
|| agent->state != JUICE_STATE_COMPLETED) {
if (agent_conn_update(agent, &conn_impl->next_timestamp) != 0) {
JLOG_WARN("Agent update failed");
conn_impl->state = CONN_STATE_FINISHED;
mutex_unlock(&conn_impl->mutex);
return JUICE_ERR_FAILED;
}
}

mutex_unlock(&conn_impl->mutex);
return ret;
}

static inline int conn_user_recv(socket_t sock, char *buffer, size_t size, addr_record_t *src) {
JLOG_VERBOSE("Receiving datagram");
int len;
while ((len = udp_recvfrom(sock, buffer, size, src)) == 0) {
// Empty datagram, ignore
}

if (len < 0) {
if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
JLOG_VERBOSE("No more datagrams to receive");
return 0;
}
JLOG_ERROR("recvfrom failed, errno=%d", sockerrno);
return -1;
}

addr_unmap_inet6_v4mapped((struct sockaddr *)&src->addr, &src->len);
return len; // len > 0
}

int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config) {
(void)registry;

conn_impl_t *conn_impl = calloc(1, sizeof(conn_impl_t));
if (!conn_impl) {
JLOG_FATAL("Memory allocation failed for connection impl");
return -1;
}

conn_impl->sock = udp_create_socket(config);
if (conn_impl->sock == INVALID_SOCKET) {
JLOG_ERROR("UDP socket creation failed");
free(conn_impl);
return -1;
}

mutex_init(&conn_impl->mutex, 0);
mutex_init(&conn_impl->send_mutex, 0);

agent->conn_impl = conn_impl;

return JUICE_ERR_SUCCESS;
}

void conn_user_cleanup(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;

closesocket(conn_impl->sock);
mutex_destroy(&conn_impl->mutex);
mutex_destroy(&conn_impl->send_mutex);
free(agent->conn_impl);
agent->conn_impl = NULL;
}

void conn_user_lock(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;
mutex_lock(&conn_impl->mutex);
}

void conn_user_unlock(juice_agent_t *agent) {
conn_impl_t *conn_impl = agent->conn_impl;
mutex_unlock(&conn_impl->mutex);
}

int conn_user_interrupt(juice_agent_t *agent) {
// juice_user_poll does not block when polling, so there's nothing to interrupt
return JUICE_ERR_SUCCESS;
}

int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds) {
conn_impl_t *conn_impl = agent->conn_impl;

mutex_lock(&conn_impl->send_mutex);

if (conn_impl->send_ds >= 0 && conn_impl->send_ds != ds) {
JLOG_VERBOSE("Setting Differentiated Services field to 0x%X", ds);
if (udp_set_diffserv(conn_impl->sock, ds) == 0)
conn_impl->send_ds = ds;
else
conn_impl->send_ds = -1; // disable for next time
}

JLOG_VERBOSE("Sending datagram, size=%d", size);

int ret = udp_sendto(conn_impl->sock, data, size, dst);
if (ret < 0) {
if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK)
JLOG_INFO("Send failed, buffer is full");
else if (sockerrno == SEMSGSIZE)
JLOG_WARN("Send failed, datagram is too large");
else
JLOG_WARN("Send failed, errno=%d", sockerrno);
}

mutex_unlock(&conn_impl->send_mutex);
return ret;
}

int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) {
conn_impl_t *conn_impl = agent->conn_impl;

return udp_get_addrs(conn_impl->sock, records, size);
}
28 changes: 28 additions & 0 deletions src/conn_user.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (c) 2023 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef JUICE_CONN_USER_H
#define JUICE_CONN_USER_H

#include "addr.h"
#include "conn.h"
#include "timestamp.h"

#include <stdbool.h>
#include <stdint.h>

int conn_user_init(juice_agent_t *agent, conn_registry_t *registry, udp_socket_config_t *config);
void conn_user_cleanup(juice_agent_t *agent);
void conn_user_lock(juice_agent_t *agent);
void conn_user_unlock(juice_agent_t *agent);
int conn_user_interrupt(juice_agent_t *agent);
int conn_user_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds);
int conn_user_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size);

#endif
7 changes: 7 additions & 0 deletions test/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ int test_stun(void);
int test_connectivity(void);
int test_thread(void);
int test_mux(void);
int test_user(void);
int test_notrickle(void);
int test_gathering(void);
int test_turn(void);
Expand Down Expand Up @@ -79,6 +80,12 @@ int main(int argc, char **argv) {
return -1;
}

printf("\nRunning user-mode connectivity test...\n");
if (test_user()) {
fprintf(stderr, "User-mode connectivity test failed\n");
return -1;
}

printf("\nRunning non-trickled connectivity test...\n");
if (test_notrickle()) {
fprintf(stderr, "Non-trickled connectivity test failed\n");
Expand Down
Loading

0 comments on commit e8dac23

Please sign in to comment.