Skip to content

Commit

Permalink
[ntcore] Unify NetworkInterface and MessageHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterJohnson committed Oct 11, 2024
1 parent 8ca99c7 commit ecc8083
Show file tree
Hide file tree
Showing 21 changed files with 492 additions and 478 deletions.
76 changes: 40 additions & 36 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Log.h"
#include "Types_internal.h"
#include "Value_internal.h"
#include "net/MessageHandler.h"
#include "networktables/NetworkTableValue.h"

using namespace nt;
Expand Down Expand Up @@ -403,7 +404,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES);
// check local flag so we don't echo back received properties changes
if (m_network && sendNetwork) {
m_network->SetProperties(topic->name, update);
m_network->ClientSetProperties(topic->name, update);
}
}

Expand Down Expand Up @@ -503,8 +504,9 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) {
// this may result in a duplicate publish warning on the server side,
// but send one anyway in this case just to be sure
if (nextPub->active && m_network) {
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, nextPub->config);
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
topic->name, topic->typeStr, topic->properties,
nextPub->config);
}
}
}
Expand Down Expand Up @@ -561,8 +563,8 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher(
}

if (publisher->active && m_network) {
m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
}
return publisher;
}
Expand All @@ -580,7 +582,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
}

if (publisher->active && m_network) {
m_network->Unpublish(Handle{publisher->handle}.GetIndex());
m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex());
}

if (publisher->active && !topic->localPublishers.empty()) {
Expand All @@ -593,9 +595,9 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
topic->typeStr = nextPub->config.typeStr;
RefreshPubSubActive(topic, false);
if (nextPub->active && m_network) {
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties,
nextPub->config);
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
topic->name, topic->typeStr,
topic->properties, nextPub->config);
}
}
}
Expand All @@ -619,8 +621,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber(
}
if (m_network && !subscriber->config.hidden) {
DEBUG4("-> NetworkSubscribe({})", topic->name);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}},
config);
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
{{topic->name}}, config);
}

// queue current value
Expand Down Expand Up @@ -648,7 +650,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) {
}
}
if (m_network && !subscriber->config.hidden) {
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
Expand Down Expand Up @@ -685,8 +687,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber(
}
if (m_network && !subscriber->options.hidden) {
DEBUG4("-> NetworkSubscribe");
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
return subscriber;
}
Expand All @@ -704,7 +706,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) {
}
}
if (m_network && !subscriber->options.hidden) {
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
Expand Down Expand Up @@ -978,7 +980,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(Handle{publisher->handle}.GetIndex(), value);
m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
Expand Down Expand Up @@ -1074,34 +1076,35 @@ LocalStorage::Impl::Impl(int inst, IListenerStorage& listenerStorage,

LocalStorage::~LocalStorage() = default;

NT_Topic LocalStorage::NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
int LocalStorage::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
return topic->handle;
return Handle{topic->handle}.GetIndex();
}

void LocalStorage::NetworkUnannounce(std::string_view name) {
void LocalStorage::ServerUnannounce(std::string_view name, int id) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.RemoveNetworkPublisher(topic);
}

void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
void LocalStorage::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
std::scoped_lock lock{m_mutex};
auto it = m_impl.m_nameTopics.find(name);
if (it != m_impl.m_nameTopics.end()) {
m_impl.NetworkPropertiesUpdate(it->second, update, ack);
}
}

void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
void LocalStorage::ServerSetValue(int topicId, const Value& value) {
std::scoped_lock lock{m_mutex};
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
if (auto topic =
m_impl.m_topics.Get(Handle{m_impl.m_inst, topicId, Handle::kTopic})) {
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
if (topic->IsCached()) {
topic->lastValueNetwork = value;
Expand All @@ -1111,12 +1114,12 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
}
}

void LocalStorage::StartNetwork(net::NetworkInterface* network) {
void LocalStorage::StartNetwork(net::ClientMessageHandler* network) {
std::scoped_lock lock{m_mutex};
m_impl.StartNetwork(network);
}

void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
void LocalStorage::Impl::StartNetwork(net::ClientMessageHandler* network) {
DEBUG4("StartNetwork()");
m_network = network;
// publish all active publishers to the network and send last values
Expand All @@ -1125,26 +1128,27 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, publisher->config);
network->ClientPublish(Handle{publisher->handle}.GetIndex(),
topic->name, topic->typeStr, topic->properties,
publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
network->SetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
}
}
for (auto&& subscriber : m_subscribers) {
if (!subscriber->config.hidden) {
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
}
}
for (auto&& subscriber : m_multiSubscribers) {
if (!subscriber->options.hidden) {
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions ntcore/src/main/native/cpp/LocalStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include <stdint.h>

#include <functional>
#include <memory>
#include <span>
#include <string>
Expand All @@ -26,6 +25,7 @@
#include "Types_internal.h"
#include "ValueCircularBuffer.h"
#include "VectorSet.h"
#include "net/MessageHandler.h"
#include "net/NetworkInterface.h"
#include "ntcore_cpp.h"

Expand All @@ -46,15 +46,15 @@ class LocalStorage final : public net::ILocalStorage {
~LocalStorage() final;

// network interface functions
NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void NetworkUnannounce(std::string_view name) final;
void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;

void StartNetwork(net::NetworkInterface* network) final;
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void ServerSetValue(int topicId, const Value& value) final;

void StartNetwork(net::ClientMessageHandler* network) final;
void ClearNetwork() final;

// User functions. These are the actual implementations of the corresponding
Expand Down Expand Up @@ -555,7 +555,7 @@ class LocalStorage final : public net::ILocalStorage {
int m_inst;
IListenerStorage& m_listenerStorage;
wpi::Logger& m_logger;
net::NetworkInterface* m_network{nullptr};
net::ClientMessageHandler* m_network{nullptr};

// handle mappings
HandleMap<TopicData, 16> m_topics;
Expand Down Expand Up @@ -606,7 +606,7 @@ class LocalStorage final : public net::ILocalStorage {
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);
void StartNetwork(net::NetworkInterface* network);
void StartNetwork(net::ClientMessageHandler* network);

PublisherData* AddLocalPublisher(TopicData* topic,
const wpi::json& properties,
Expand Down
1 change: 1 addition & 0 deletions ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "IConnectionList.h"
#include "Log.h"
#include "net/NetworkInterface.h"

using namespace nt;
namespace uv = wpi::uv;
Expand Down
43 changes: 25 additions & 18 deletions ntcore/src/main/native/cpp/net/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,8 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
continue;
}

// otherwise it's a value message, get the local topic handle for it
auto topicIt = m_topicMap.find(id);
if (topicIt == m_topicMap.end()) {
WARN("received unknown id {}", id);
continue;
}

// pass along to local handler
if (m_local) {
m_local->NetworkSetValue(topicIt->second, value);
}
// otherwise it's a value message
ServerSetValue(id, value);
}
}

Expand Down Expand Up @@ -229,27 +220,43 @@ void ClientImpl::SetValue(int32_t pubuid, const Value& value) {
publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal);
}

void ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
int ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid);
m_topicMap[id] =
m_local->ServerAnnounce(name, 0, typeStr, properties, pubuid);
return id;
}

void ClientImpl::ServerUnannounce(std::string_view name, int id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);
m_local->ServerUnannounce(name, m_topicMap[id]);
m_topicMap.erase(id);
}

void ClientImpl::ServerPropertiesUpdate(std::string_view name,
const wpi::json& update, bool ack) {
DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack);
assert(m_local);
m_local->NetworkPropertiesUpdate(name, update, ack);
m_local->ServerPropertiesUpdate(name, update, ack);
}

void ClientImpl::ServerSetValue(int topicId, const Value& value) {
// get the local topic handle for it
auto topicIt = m_topicMap.find(topicId);
if (topicIt == m_topicMap.end()) {
WARN("received unknown id {}", topicId);
return;
}

// pass along to local handler
if (m_local) {
m_local->ServerSetValue(topicIt->second, value);
}
}

void ClientImpl::ProcessIncomingText(std::string_view data) {
Expand Down
11 changes: 6 additions & 5 deletions ntcore/src/main/native/cpp/net/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <wpi/DenseMap.h>

#include "NetworkInterface.h"
#include "MessageHandler.h"
#include "NetworkOutgoingQueue.h"
#include "NetworkPing.h"
#include "PubSubOptions.h"
Expand Down Expand Up @@ -49,7 +49,7 @@ class ClientImpl final : private ServerMessageHandler {

void SendOutgoing(uint64_t curTimeMs, bool flush);

void SetLocal(LocalInterface* local) { m_local = local; }
void SetLocal(ServerMessageHandler* local) { m_local = local; }
void SendInitial();

private:
Expand All @@ -63,12 +63,13 @@ class ClientImpl final : private ServerMessageHandler {
void UpdatePeriodic();

// ServerMessageHandler interface
void ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
const wpi::json& properties,
std::optional<int> pubuid) final;
void ServerUnannounce(std::string_view name, int id) final;
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
void ServerSetValue(int topicId, const Value& value) final;

void Publish(int pubuid, std::string_view name, std::string_view typeStr,
const wpi::json& properties, const PubSubOptionsImpl& options);
Expand All @@ -77,7 +78,7 @@ class ClientImpl final : private ServerMessageHandler {

WireConnection& m_wire;
wpi::Logger& m_logger;
LocalInterface* m_local{nullptr};
ServerMessageHandler* m_local{nullptr};
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
m_timeSyncUpdated;
std::function<void(uint32_t repeatMs)> m_setPeriodic;
Expand All @@ -86,7 +87,7 @@ class ClientImpl final : private ServerMessageHandler {
std::vector<std::unique_ptr<PublisherData>> m_publishers;

// indexed by server-provided topic id
wpi::DenseMap<int, NT_Topic> m_topicMap;
wpi::DenseMap<int, int> m_topicMap;

// ping
NetworkPing m_ping;
Expand Down
Loading

0 comments on commit ecc8083

Please sign in to comment.