diff --git a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.cpp b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.cpp index 51f57689d65..6b6e7ad5e84 100644 --- a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.cpp +++ b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.cpp @@ -12,18 +12,23 @@ static constexpr size_t kMaxSize = 2 * 1024 * 1024; std::span ClientMessageConcurrentQueue::ReadQueue( std::span out) { - size_t count = m_queue.try_dequeue_bulk(out.begin(), out.size()); - std::span rv = out.subspan(0, count); - for (auto&& elem : rv) { - if (auto msg = std::get_if(&elem.contents)) { - m_size -= sizeof(ClientMessage) + msg->value.size(); + std::scoped_lock lock{m_mutex}; + size_t count = 0; + for (auto&& msg : out) { + if (!m_queue.try_dequeue(msg)) { + break; + } + if (auto* val = std::get_if(&msg.contents)) { + m_size -= sizeof(ClientMessage) + val->value.size(); m_sizeErrored = false; } + ++count; } - return rv; + return out.subspan(0, count); } void ClientMessageConcurrentQueue::ClearQueue() { + std::scoped_lock lock{m_mutex}; ClientMessage msg; while (m_queue.try_dequeue(msg)) { } @@ -33,6 +38,7 @@ void ClientMessageConcurrentQueue::ClearQueue() { void ClientMessageConcurrentQueue::ClientSetValue(int pubuid, const Value& value) { + std::scoped_lock lock{m_mutex}; m_size += sizeof(ClientMessage) + value.size(); if (m_size > kMaxSize) { if (!m_sizeErrored) { diff --git a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.h b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.h index 4668d2ab9a4..e3125ee9501 100644 --- a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.h +++ b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.h @@ -7,7 +7,8 @@ #include #include -#include +#include +#include #include "Message.h" #include "MessageHandler.h" @@ -21,7 +22,7 @@ namespace nt::net { class ClientMessageConcurrentQueue final : public ClientMessageHandler, public ClientMessageQueue { public: - static constexpr size_t kBlockSize = 256; + static constexpr size_t kBlockSize = 64; explicit ClientMessageConcurrentQueue(wpi::Logger& logger) : m_logger{logger} {} @@ -43,14 +44,11 @@ class ClientMessageConcurrentQueue final : public ClientMessageHandler, void ClientSetValue(int pubuid, const Value& value) final; private: - struct ConcurrentQueueTraits : public wpi::ConcurrentQueueDefaultTraits { - static constexpr size_t BLOCK_SIZE = kBlockSize; - }; - wpi::ConcurrentQueue m_queue{ - kBlockSize * 2}; + wpi::mutex m_mutex; + wpi::FastQueue m_queue{kBlockSize - 1}; wpi::Logger& m_logger; - std::atomic m_size{0}; - std::atomic m_sizeErrored{false}; + size_t m_size{0}; + bool m_sizeErrored{false}; }; } // namespace nt::net diff --git a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.inc b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.inc index 7a673924bdb..257b22d9735 100644 --- a/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.inc +++ b/ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.inc @@ -14,27 +14,32 @@ namespace nt::net { inline void ClientMessageConcurrentQueue::ClientPublish( int pubuid, std::string_view name, std::string_view typeStr, const wpi::json& properties, const PubSubOptionsImpl& options) { + std::scoped_lock lock{m_mutex}; m_queue.enqueue(ClientMessage{PublishMsg{ pubuid, std::string{name}, std::string{typeStr}, properties, options}}); } inline void ClientMessageConcurrentQueue::ClientUnpublish(int pubuid) { + std::scoped_lock lock{m_mutex}; m_queue.enqueue(ClientMessage{UnpublishMsg{pubuid}}); } inline void ClientMessageConcurrentQueue::ClientSetProperties( std::string_view name, const wpi::json& update) { + std::scoped_lock lock{m_mutex}; m_queue.enqueue(ClientMessage{SetPropertiesMsg{std::string{name}, update}}); } inline void ClientMessageConcurrentQueue::ClientSubscribe( int subuid, std::span topicNames, const PubSubOptionsImpl& options) { + std::scoped_lock lock{m_mutex}; m_queue.enqueue(ClientMessage{ SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}}); } inline void ClientMessageConcurrentQueue::ClientUnsubscribe(int subuid) { + std::scoped_lock lock{m_mutex}; m_queue.enqueue(ClientMessage{UnsubscribeMsg{subuid}}); }