Skip to content

Commit

Permalink
Revert use of ConcurrentQueue and use FastQueue with mutex instead
Browse files Browse the repository at this point in the history
ntcoredev stress shows issues with ConcurrentQueue.
  • Loading branch information
PeterJohnson committed Oct 10, 2024
1 parent 34b9137 commit 0685066
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
18 changes: 12 additions & 6 deletions ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ static constexpr size_t kMaxSize = 2 * 1024 * 1024;

std::span<ClientMessage> ClientMessageConcurrentQueue::ReadQueue(
std::span<ClientMessage> out) {
size_t count = m_queue.try_dequeue_bulk(out.begin(), out.size());
std::span<ClientMessage> rv = out.subspan(0, count);
for (auto&& elem : rv) {
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
m_size -= sizeof(ClientMessage) + msg->value.size();
std::scoped_lock lock{m_mutex};
size_t count = 0;
for (auto&& msg : out) {
if (!m_queue.try_dequeue(msg)) {
break;
}
if (auto* val = std::get_if<ClientValueMsg>(&msg.contents)) {
m_size -= sizeof(ClientMessage) + val->value.size();
m_sizeErrored = false;
}
++count;
}
return rv;
return out.subspan(0, count);
}

void ClientMessageConcurrentQueue::ClearQueue() {
std::scoped_lock lock{m_mutex};
ClientMessage msg;
while (m_queue.try_dequeue(msg)) {
}
Expand All @@ -33,6 +38,7 @@ void ClientMessageConcurrentQueue::ClearQueue() {

void ClientMessageConcurrentQueue::ClientSetValue(int pubuid,
const Value& value) {
std::scoped_lock lock{m_mutex};
m_size += sizeof(ClientMessage) + value.size();
if (m_size > kMaxSize) {
if (!m_sizeErrored) {
Expand Down
16 changes: 7 additions & 9 deletions ntcore/src/main/native/cpp/net/ClientMessageConcurrentQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
#include <span>
#include <string>

#include <wpi/concurrentqueue.h>
#include <wpi/FastQueue.h>
#include <wpi/mutex.h>

#include "Message.h"
#include "MessageHandler.h"
Expand All @@ -21,7 +22,7 @@ namespace nt::net {
class ClientMessageConcurrentQueue final : public ClientMessageHandler,
public ClientMessageQueue {
public:
static constexpr size_t kBlockSize = 256;
static constexpr size_t kBlockSize = 64;

explicit ClientMessageConcurrentQueue(wpi::Logger& logger)
: m_logger{logger} {}
Expand All @@ -43,14 +44,11 @@ class ClientMessageConcurrentQueue final : public ClientMessageHandler,
void ClientSetValue(int pubuid, const Value& value) final;

private:
struct ConcurrentQueueTraits : public wpi::ConcurrentQueueDefaultTraits {
static constexpr size_t BLOCK_SIZE = kBlockSize;
};
wpi::ConcurrentQueue<ClientMessage, ConcurrentQueueTraits> m_queue{
kBlockSize * 2};
wpi::mutex m_mutex;
wpi::FastQueue<ClientMessage, kBlockSize> m_queue{kBlockSize - 1};
wpi::Logger& m_logger;
std::atomic<size_t> m_size{0};
std::atomic<bool> m_sizeErrored{false};
size_t m_size{0};
bool m_sizeErrored{false};
};

} // namespace nt::net
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@ namespace nt::net {
inline void ClientMessageConcurrentQueue::ClientPublish(
int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.enqueue(ClientMessage{PublishMsg{
pubuid, std::string{name}, std::string{typeStr}, properties, options}});
}

inline void ClientMessageConcurrentQueue::ClientUnpublish(int pubuid) {
std::scoped_lock lock{m_mutex};
m_queue.enqueue(ClientMessage{UnpublishMsg{pubuid}});
}

inline void ClientMessageConcurrentQueue::ClientSetProperties(
std::string_view name, const wpi::json& update) {
std::scoped_lock lock{m_mutex};
m_queue.enqueue(ClientMessage{SetPropertiesMsg{std::string{name}, update}});
}

inline void ClientMessageConcurrentQueue::ClientSubscribe(
int subuid, std::span<const std::string> topicNames,
const PubSubOptionsImpl& options) {
std::scoped_lock lock{m_mutex};
m_queue.enqueue(ClientMessage{
SubscribeMsg{subuid, {topicNames.begin(), topicNames.end()}, options}});
}

inline void ClientMessageConcurrentQueue::ClientUnsubscribe(int subuid) {
std::scoped_lock lock{m_mutex};
m_queue.enqueue(ClientMessage{UnsubscribeMsg{subuid}});
}

Expand Down

0 comments on commit 0685066

Please sign in to comment.