From 4228d38ee0d26e46983cb5eb96a5489129c62500 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sun, 26 Nov 2023 11:46:07 +1100 Subject: [PATCH] AP_Networking: support UDP server, TCP client and TCP server and implement mavlink packetisation and flow control return --- libraries/AP_Networking/AP_Networking.h | 29 +- .../AP_Networking/AP_Networking_address.cpp | 7 +- .../AP_Networking/AP_Networking_address.h | 5 +- .../AP_Networking/AP_Networking_port.cpp | 317 ++++++++++++++++-- 4 files changed, 324 insertions(+), 34 deletions(-) diff --git a/libraries/AP_Networking/AP_Networking.h b/libraries/AP_Networking/AP_Networking.h index c931fc26982461..7b33573c46cb00 100644 --- a/libraries/AP_Networking/AP_Networking.h +++ b/libraries/AP_Networking/AP_Networking.h @@ -184,6 +184,9 @@ class AP_Networking enum class NetworkPortType { NONE = 0, UDP_CLIENT = 1, + UDP_SERVER = 2, + TCP_CLIENT = 3, + TCP_SERVER = 4, }; // class for NET_Pn_* parameters @@ -199,6 +202,7 @@ class AP_Networking AP_Networking_IPV4 ip {"0.0.0.0"}; AP_Int32 port; SocketAPM *sock; + SocketAPM *listen_sock; bool is_initialized() override { return true; @@ -207,11 +211,22 @@ class AP_Networking return false; } - void udp_client_init(const uint32_t size_rx, const uint32_t size_tx); + void wait_startup(); + void udp_client_init(void); + void udp_server_init(void); + void tcp_server_init(void); + void tcp_client_init(void); + void udp_client_loop(void); + void udp_server_loop(void); + void tcp_client_loop(void); + void tcp_server_loop(void); + + bool send_receive(void); private: bool init_buffers(const uint32_t size_rx, const uint32_t size_tx); + void thread_create(AP_HAL::MemberProc); uint32_t txspace() override; void _begin(uint32_t b, uint16_t rxS, uint16_t txS) override; @@ -222,10 +237,22 @@ class AP_Networking void _flush() override {} bool _discard_input() override; + enum flow_control get_flow_control(void) override; + + uint32_t bw_in_bytes_per_second() const override { + return 1000000UL; + } + ByteBuffer *readbuffer; ByteBuffer *writebuffer; + char thread_name[10]; uint32_t last_size_tx; uint32_t last_size_rx; + bool packetise; + bool connected; + bool have_received; + bool close_on_recv_error; + HAL_Semaphore sem; }; diff --git a/libraries/AP_Networking/AP_Networking_address.cpp b/libraries/AP_Networking/AP_Networking_address.cpp index 5cadbbe6f7a144..28900e387b2673 100644 --- a/libraries/AP_Networking/AP_Networking_address.cpp +++ b/libraries/AP_Networking/AP_Networking_address.cpp @@ -6,6 +6,7 @@ #if AP_NETWORKING_ENABLED +#include #include "AP_Networking.h" const AP_Param::GroupInfo AP_Networking_IPV4::var_info[] = { @@ -68,9 +69,11 @@ void AP_Networking_IPV4::set_default_uint32(uint32_t v) } } -const char* AP_Networking_IPV4::get_str() const +const char* AP_Networking_IPV4::get_str() { - return AP_Networking::convert_ip_to_str(get_uint32()); + const auto ip = ntohl(get_uint32()); + inet_ntop(AF_INET, &ip, strbuf, sizeof(strbuf)); + return strbuf; } #endif // AP_NETWORKING_ENABLED diff --git a/libraries/AP_Networking/AP_Networking_address.h b/libraries/AP_Networking/AP_Networking_address.h index a4a3b6f8b29a8f..f9852948676ec8 100644 --- a/libraries/AP_Networking/AP_Networking_address.h +++ b/libraries/AP_Networking/AP_Networking_address.h @@ -18,12 +18,15 @@ class AP_Networking_IPV4 void set_uint32(uint32_t addr); // return address as a null-terminated string - const char* get_str() const; + const char* get_str(); // set default address from a uint32 void set_default_uint32(uint32_t addr); static const struct AP_Param::GroupInfo var_info[]; + +private: + char strbuf[16]; }; /* diff --git a/libraries/AP_Networking/AP_Networking_port.cpp b/libraries/AP_Networking/AP_Networking_port.cpp index e70224cf4b4b7b..23c43df13304fb 100644 --- a/libraries/AP_Networking/AP_Networking_port.cpp +++ b/libraries/AP_Networking/AP_Networking_port.cpp @@ -11,6 +11,8 @@ #include #include #include +#include +#include extern const AP_HAL::HAL& hal; @@ -22,11 +24,15 @@ extern const AP_HAL::HAL& hal; #define AP_NETWORKING_PORT_MIN_RXSIZE 2048 #endif +#ifndef AP_NETWORKING_PORT_STACK_SIZE +#define AP_NETWORKING_PORT_STACK_SIZE 1024 +#endif + const AP_Param::GroupInfo AP_Networking::Port::var_info[] = { // @Param: TYPE // @DisplayName: Port type - // @Description: Port type - // @Values: 0:Disabled, 1:UDP client + // @Description: Port type for network serial port. For the two client types a valid destination IP address must be set. For the two server types either 0.0.0.0 or a local address can be used. + // @Values: 0:Disabled, 1:UDP client, 2:TCP client, 3:TCP server // @RebootRequired: True // @User: Advanced AP_GROUPINFO_FLAGS("TYPE", 1, AP_Networking::Port, type, 0, AP_PARAM_FLAG_ENABLE), @@ -66,45 +72,121 @@ void AP_Networking::ports_init(void) case NetworkPortType::NONE: break; case NetworkPortType::UDP_CLIENT: - p.udp_client_init(AP_NETWORKING_PORT_MIN_RXSIZE, AP_NETWORKING_PORT_MIN_TXSIZE); + p.udp_client_init(); + break; + case NetworkPortType::UDP_SERVER: + p.udp_server_init(); + break; + case NetworkPortType::TCP_SERVER: + p.tcp_server_init(); + break; + case NetworkPortType::TCP_CLIENT: + p.tcp_client_init(); break; } - if (p.sock != nullptr) { + if (p.sock != nullptr || p.listen_sock != nullptr) { AP::serialmanager().register_port(&p); } } } /* - initialise a UDP client + wrapper for thread_create for port functions */ -void AP_Networking::Port::udp_client_init(const uint32_t size_rx, const uint32_t size_tx) +void AP_Networking::Port::thread_create(AP_HAL::MemberProc proc) { - WITH_SEMAPHORE(sem); - if (!init_buffers(size_rx, size_tx)) { + const uint8_t idx = state.idx - AP_SERIALMANAGER_NET_PORT_1; + hal.util->snprintf(thread_name, sizeof(thread_name), "NET_P%u", unsigned(idx)); + + if (!init_buffers(AP_NETWORKING_PORT_MIN_RXSIZE, AP_NETWORKING_PORT_MIN_TXSIZE)) { + AP_BoardConfig::allocation_error("Failed to allocate %s buffers", thread_name); return; } - if (sock != nullptr) { + + if (!hal.scheduler->thread_create(proc, thread_name, AP_NETWORKING_PORT_STACK_SIZE, AP_HAL::Scheduler::PRIORITY_UART, 0)) { + AP_BoardConfig::allocation_error("Failed to allocate %s client thread", thread_name); + } +} + +/* + initialise a UDP client + */ +void AP_Networking::Port::udp_client_init(void) +{ + sock = new SocketAPM(true); + if (sock == nullptr) { return; } + sock->set_blocking(false); + + // setup for packet boundaries if this is mavlink + packetise = (state.protocol == AP_SerialManager::SerialProtocol_MAVLink || + state.protocol == AP_SerialManager::SerialProtocol_MAVLink2); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_client_loop, void)); +} + +/* + initialise a UDP server + */ +void AP_Networking::Port::udp_server_init(void) +{ sock = new SocketAPM(true); if (sock == nullptr) { return; } - if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_client_loop, void), "NET", 2048, AP_HAL::Scheduler::PRIORITY_UART, 0)) { - AP_BoardConfig::allocation_error("Failed to allocate UDP client thread"); + sock->set_blocking(false); + + // setup for packet boundaries if this is mavlink + packetise = (state.protocol == AP_SerialManager::SerialProtocol_MAVLink || + state.protocol == AP_SerialManager::SerialProtocol_MAVLink2); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_server_loop, void)); +} + +/* + initialise a TCP server + */ +void AP_Networking::Port::tcp_server_init(void) +{ + listen_sock = new SocketAPM(false); + if (listen_sock == nullptr) { + return; } + listen_sock->reuseaddress(); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::tcp_server_loop, void)); } /* - update a UDP client + initialise a TCP client */ -void AP_Networking::Port::udp_client_loop(void) +void AP_Networking::Port::tcp_client_init(void) +{ + sock = new SocketAPM(false); + if (sock != nullptr) { + sock->set_blocking(true); + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::tcp_client_loop, void)); + } +} + +/* + wait for networking stack to be up + */ +void AP_Networking::Port::wait_startup(void) { while (!hal.scheduler->is_system_initialized()) { hal.scheduler->delay(100); } hal.scheduler->delay(1000); +} + +/* + update a UDP client + */ +void AP_Networking::Port::udp_client_loop(void) +{ + wait_startup(); const char *dest = ip.get_str(); if (!sock->connect(dest, port.get())) { @@ -116,31 +198,188 @@ void AP_Networking::Port::udp_client_loop(void) GCS_SEND_TEXT(MAV_SEVERITY_INFO, "UDP[%u]: connected to %s:%u", (unsigned)state.idx, dest, unsigned(port.get())); + connected = true; + + bool active = false; while (true) { - hal.scheduler->delay_microseconds(500); - WITH_SEMAPHORE(sem); + if (!active) { + hal.scheduler->delay_microseconds(100); + } + active = send_receive(); + } +} - // handle outgoing packets - uint32_t available; - const auto *ptr = writebuffer->readptr(available); - if (ptr != nullptr) { - const auto ret = sock->send(ptr, available); - if (ret > 0) { - writebuffer->advance(ret); +/* + update a UDP server + */ +void AP_Networking::Port::udp_server_loop(void) +{ + wait_startup(); + + const char *addr = ip.get_str(); + if (!sock->bind(addr, port.get())) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "UDP[%u]: Failed to bind to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + delete sock; + sock = nullptr; + return; + } + sock->reuseaddress(); + + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "UDP[%u]: bound to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + active = send_receive(); + } +} + +/* + update a TCP server + */ +void AP_Networking::Port::tcp_server_loop(void) +{ + wait_startup(); + + const char *addr = ip.get_str(); + if (!listen_sock->bind(addr, port.get()) || !listen_sock->listen(1)) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "TCP[%u]: Failed to bind to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + delete listen_sock; + listen_sock = nullptr; + return; + } + + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: bound to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + + close_on_recv_error = true; + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + if (sock == nullptr) { + sock = listen_sock->accept(100); + if (sock != nullptr) { + sock->set_blocking(false); + char buf[16]; + uint16_t last_port; + const char *last_addr = listen_sock->last_recv_address(buf, sizeof(buf), last_port); + if (last_addr != nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: connection from %s:%u", (unsigned)state.idx, last_addr, unsigned(last_port)); + } + connected = true; + sock->reuseaddress(); } } + if (sock != nullptr) { + active = send_receive(); + } + } +} - // handle incoming packets - const auto space = readbuffer->space(); - if (space > 0) { - const uint32_t n = MIN(350U, space); - uint8_t buf[n]; - const auto ret = sock->recv(buf, n, 0); - if (ret > 0) { - readbuffer->write(buf, ret); +/* + update a TCP client + */ +void AP_Networking::Port::tcp_client_loop(void) +{ + wait_startup(); + + close_on_recv_error = true; + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + if (sock == nullptr) { + sock = new SocketAPM(false); + if (sock == nullptr) { + continue; + } + sock->set_blocking(true); + connected = false; + } + if (!connected) { + const char *dest = ip.get_str(); + connected = sock->connect(dest, port.get()); + if (connected) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: connected to %s:%u", unsigned(state.idx), dest, unsigned(port.get())); + sock->set_blocking(false); + } else { + delete sock; + sock = nullptr; + // don't try and connect too fast + hal.scheduler->delay(100); + } + } + if (sock != nullptr && connected) { + active = send_receive(); + } + } +} + +/* + run one send/receive loop + */ +bool AP_Networking::Port::send_receive(void) +{ + + bool active = false; + WITH_SEMAPHORE(sem); + + // handle incoming packets + const auto space = readbuffer->space(); + if (space > 0) { + const uint32_t n = MIN(300U, space); + uint8_t buf[n]; + const auto ret = sock->recv(buf, n, 0); + if (close_on_recv_error && ret == 0) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: closed connection", unsigned(state.idx)); + delete sock; + sock = nullptr; + return false; + } + if (ret > 0) { + readbuffer->write(buf, ret); + active = true; + have_received = true; + } + } + + if (connected) { + // handle outgoing packets + uint32_t available = writebuffer->available(); + available = MIN(300U, available); + if (packetise) { + available = mavlink_packetise(*writebuffer, available); + } + if (available > 0) { + uint8_t buf[available]; + auto n = writebuffer->peekbytes(buf, available); + if (n > 0) { + const auto ret = sock->send(buf, n); + if (ret > 0) { + writebuffer->advance(ret); + active = true; + } + } + } + } else { + if (type == NetworkPortType::UDP_SERVER && have_received) { + // connect the socket to the last receive address if we have one + char buf[16]; + uint16_t last_port; + const char *last_addr = sock->last_recv_address(buf, sizeof(buf), last_port); + if (last_addr != nullptr && port != 0) { + connected = sock->connect(last_addr, last_port); } } } + + return active; } /* @@ -210,4 +449,22 @@ bool AP_Networking::Port::init_buffers(const uint32_t size_rx, const uint32_t si return readbuffer != nullptr && writebuffer != nullptr; } +/* + return flow control state + */ +enum AP_HAL::UARTDriver::flow_control AP_Networking::Port::get_flow_control(void) +{ + const NetworkPortType ptype = (NetworkPortType)type; + switch (ptype) { + case NetworkPortType::TCP_CLIENT: + case NetworkPortType::TCP_SERVER: + return AP_HAL::UARTDriver::FLOW_CONTROL_ENABLE; + case NetworkPortType::UDP_CLIENT: + case NetworkPortType::UDP_SERVER: + case NetworkPortType::NONE: + break; + } + return AP_HAL::UARTDriver::FLOW_CONTROL_DISABLE; +} + #endif // AP_NETWORKING_ENABLED