Skip to content

Commit

Permalink
[ntcore] Change network interfaces and messages to use UIDs instead o…
Browse files Browse the repository at this point in the history
…f handles
  • Loading branch information
PeterJohnson committed Oct 11, 2024
1 parent 28cb7cf commit 257d3da
Show file tree
Hide file tree
Showing 26 changed files with 435 additions and 500 deletions.
26 changes: 16 additions & 10 deletions ntcore/src/main/native/cpp/Handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class Handle {
static_assert(kTypeMax <= wpi::kHandleTypeHALBase);
enum { kIndexMax = 0xfffff };

explicit Handle(NT_Handle handle) : m_handle(handle) {}
operator NT_Handle() const { return m_handle; }
constexpr explicit Handle(NT_Handle handle) : m_handle(handle) {}
constexpr operator NT_Handle() const { return m_handle; }

NT_Handle handle() const { return m_handle; }
constexpr NT_Handle handle() const { return m_handle; }

Handle(int inst, int index, Type type) {
constexpr Handle(int inst, int index, Type type) {
if (inst < 0 || index < 0) {
m_handle = 0;
return;
Expand All @@ -47,16 +47,22 @@ class Handle {
(index & 0xfffff);
}

unsigned int GetIndex() const {
constexpr unsigned int GetIndex() const {
return static_cast<unsigned int>(m_handle) & 0xfffff;
}
Type GetType() const {
constexpr Type GetType() const {
return static_cast<Type>((static_cast<int>(m_handle) >> 24) & 0x7f);
}
int GetInst() const { return (static_cast<int>(m_handle) >> 20) & 0xf; }
bool IsType(Type type) const { return type == GetType(); }
int GetTypedIndex(Type type) const { return IsType(type) ? GetIndex() : -1; }
int GetTypedInst(Type type) const { return IsType(type) ? GetInst() : -1; }
constexpr int GetInst() const {
return (static_cast<int>(m_handle) >> 20) & 0xf;
}
constexpr bool IsType(Type type) const { return type == GetType(); }
constexpr int GetTypedIndex(Type type) const {
return IsType(type) ? GetIndex() : -1;
}
constexpr int GetTypedInst(Type type) const {
return IsType(type) ? GetInst() : -1;
}

private:
NT_Handle m_handle;
Expand Down
46 changes: 24 additions & 22 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES);
// check local flag so we don't echo back received properties changes
if (m_network && sendNetwork) {
m_network->SetProperties(topic->handle, topic->name, update);
m_network->SetProperties(topic->name, update);
}
}

Expand All @@ -427,10 +427,10 @@ void LocalStorage::Impl::RefreshPubSubActive(TopicData* topic,
void LocalStorage::Impl::NetworkAnnounce(TopicData* topic,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) {
std::optional<int> pubuid) {
DEBUG4("LS NetworkAnnounce({}, {}, {}, {})", topic->name, typeStr,
properties.dump(), pubHandle);
if (pubHandle != 0) {
properties.dump(), pubuid.value_or(-1));
if (pubuid.has_value()) {
return; // ack of our publish; ignore
}

Expand Down Expand Up @@ -503,7 +503,7 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) {
// this may result in a duplicate publish warning on the server side,
// but send one anyway in this case just to be sure
if (nextPub->active && m_network) {
m_network->Publish(nextPub->handle, topic->handle, topic->name,
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, nextPub->config);
}
}
Expand Down Expand Up @@ -561,7 +561,7 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher(
}

if (publisher->active && m_network) {
m_network->Publish(publisher->handle, topic->handle, topic->name,
m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, config);
}
return publisher;
Expand All @@ -580,7 +580,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
}

if (publisher->active && m_network) {
m_network->Unpublish(publisher->handle, topic->handle);
m_network->Unpublish(Handle{publisher->handle}.GetIndex());
}

if (publisher->active && !topic->localPublishers.empty()) {
Expand All @@ -593,7 +593,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
topic->typeStr = nextPub->config.typeStr;
RefreshPubSubActive(topic, false);
if (nextPub->active && m_network) {
m_network->Publish(nextPub->handle, topic->handle, topic->name,
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties,
nextPub->config);
}
Expand All @@ -619,7 +619,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber(
}
if (m_network && !subscriber->config.hidden) {
DEBUG4("-> NetworkSubscribe({})", topic->name);
m_network->Subscribe(subscriber->handle, {{topic->name}}, config);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}},
config);
}

// queue current value
Expand Down Expand Up @@ -647,7 +648,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) {
}
}
if (m_network && !subscriber->config.hidden) {
m_network->Unsubscribe(subscriber->handle);
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
Expand Down Expand Up @@ -684,8 +685,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber(
}
if (m_network && !subscriber->options.hidden) {
DEBUG4("-> NetworkSubscribe");
m_network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
return subscriber;
}
Expand All @@ -703,7 +704,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) {
}
}
if (m_network && !subscriber->options.hidden) {
m_network->Unsubscribe(subscriber->handle);
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
}
}
return subscriber;
Expand Down Expand Up @@ -977,7 +978,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
if (publisher->topic->IsCached()) {
publisher->topic->lastValueNetwork = value;
}
m_network->SetValue(publisher->handle, value);
m_network->SetValue(Handle{publisher->handle}.GetIndex(), value);
}
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
suppressDuplicates, publisher);
Expand Down Expand Up @@ -1076,10 +1077,10 @@ LocalStorage::~LocalStorage() = default;
NT_Topic LocalStorage::NetworkAnnounce(std::string_view name,
std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) {
std::optional<int> pubuid) {
std::scoped_lock lock{m_mutex};
auto topic = m_impl.GetOrCreateTopic(name);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubHandle);
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
return topic->handle;
}

Expand Down Expand Up @@ -1124,25 +1125,26 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
PublisherData* anyPublisher = nullptr;
for (auto&& publisher : topic->localPublishers) {
if (publisher->active) {
network->Publish(publisher->handle, topic->handle, topic->name,
network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
topic->typeStr, topic->properties, publisher->config);
anyPublisher = publisher;
}
}
if (anyPublisher && topic->lastValue) {
network->SetValue(anyPublisher->handle, topic->lastValue);
network->SetValue(Handle{anyPublisher->handle}.GetIndex(),
topic->lastValue);
}
}
for (auto&& subscriber : m_subscribers) {
if (!subscriber->config.hidden) {
network->Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
{{subscriber->topic->name}}, subscriber->config);
}
}
for (auto&& subscriber : m_multiSubscribers) {
if (!subscriber->options.hidden) {
network->Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
subscriber->prefixes, subscriber->options);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions ntcore/src/main/native/cpp/LocalStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class LocalStorage final : public net::ILocalStorage {
// network interface functions
NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr,
const wpi::json& properties,
NT_Publisher pubHandle) final;
std::optional<int> pubuid) final;
void NetworkUnannounce(std::string_view name) final;
void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update,
bool ack) final;
Expand Down Expand Up @@ -601,7 +601,8 @@ class LocalStorage final : public net::ILocalStorage {
void RefreshPubSubActive(TopicData* topic, bool warnOnSubMismatch);

void NetworkAnnounce(TopicData* topic, std::string_view typeStr,
const wpi::json& properties, NT_Publisher pubHandle);
const wpi::json& properties,
std::optional<int> pubuid);
void RemoveNetworkPublisher(TopicData* topic);
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
bool ack);
Expand Down
2 changes: 1 addition & 1 deletion ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp,
m_wire = std::make_shared<net::WebSocketConnection>(
ws, connInfo.protocol_version, m_logger);
m_clientImpl = std::make_unique<net::ClientImpl>(
m_loop.Now().count(), m_inst, *m_wire, m_logger, m_timeSyncUpdated,
m_loop.Now().count(), *m_wire, m_logger, m_timeSyncUpdated,
[this](uint32_t repeatMs) {
DEBUG4("Setting periodic timer to {}", repeatMs);
if (m_sendOutgoingTimer &&
Expand Down
2 changes: 1 addition & 1 deletion ntcore/src/main/native/cpp/NetworkServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
while (!data.empty()) {
// decode message
int64_t pubuid;
int pubuid;
Value value;
std::string error;
if (!net::WireDecodeBinary(&data, &pubuid, &value, &error, 0)) {
Expand Down
69 changes: 29 additions & 40 deletions ntcore/src/main/native/cpp/net/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ using namespace nt;
using namespace nt::net;

ClientImpl::ClientImpl(
uint64_t curTimeMs, int inst, WireConnection& wire, wpi::Logger& logger,
uint64_t curTimeMs, WireConnection& wire, wpi::Logger& logger,
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
timeSyncUpdated,
std::function<void(uint32_t repeatMs)> setPeriodic)
: m_inst{inst},
m_wire{wire},
: m_wire{wire},
m_logger{logger},
m_timeSyncUpdated{std::move(timeSyncUpdated)},
m_setPeriodic{std::move(setPeriodic)},
Expand All @@ -58,7 +57,7 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
}

// decode message
int64_t id;
int id;
Value value;
std::string error;
if (!WireDecodeBinary(&data, &id, &value, &error,
Expand Down Expand Up @@ -114,13 +113,13 @@ void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
for (auto&& elem : msgs) {
// common case is value
if (auto msg = std::get_if<ClientValueMsg>(&elem.contents)) {
SetValue(msg->pubHandle, msg->value);
SetValue(msg->pubuid, msg->value);
} else if (auto msg = std::get_if<PublishMsg>(&elem.contents)) {
Publish(msg->pubHandle, msg->topicHandle, msg->name, msg->typeStr,
msg->properties, msg->options);
m_outgoing.SendMessage(msg->pubHandle, std::move(elem));
Publish(msg->pubuid, msg->name, msg->typeStr, msg->properties,
msg->options);
m_outgoing.SendMessage(msg->pubuid, std::move(elem));
} else if (auto msg = std::get_if<UnpublishMsg>(&elem.contents)) {
Unpublish(msg->pubHandle, msg->topicHandle, std::move(elem));
Unpublish(msg->pubuid, std::move(elem));
} else {
m_outgoing.SendMessage(0, std::move(elem));
}
Expand Down Expand Up @@ -174,38 +173,33 @@ void ClientImpl::UpdatePeriodic() {
m_setPeriodic(m_periodMs);
}

void ClientImpl::Publish(NT_Publisher pubHandle, NT_Topic topicHandle,
std::string_view name, std::string_view typeStr,
const wpi::json& properties,
void ClientImpl::Publish(int32_t pubuid, std::string_view name,
std::string_view typeStr, const wpi::json& properties,
const PubSubOptionsImpl& options) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
m_publishers.resize(index + 1);
if (static_cast<uint32_t>(pubuid) >= m_publishers.size()) {
m_publishers.resize(pubuid + 1);
}
auto& publisher = m_publishers[index];
auto& publisher = m_publishers[pubuid];
if (!publisher) {
publisher = std::make_unique<PublisherData>();
}
publisher->handle = pubHandle;
publisher->options = options;
publisher->periodMs = std::lround(options.periodicMs / 10.0) * 10;
if (publisher->periodMs < kMinPeriodMs) {
publisher->periodMs = kMinPeriodMs;
}
m_outgoing.SetPeriod(pubHandle, publisher->periodMs);
m_outgoing.SetPeriod(pubuid, publisher->periodMs);

// update period
m_periodMs = UpdatePeriodCalc(m_periodMs, publisher->periodMs);
UpdatePeriodic();
}

void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
ClientMessage&& msg) {
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size()) {
void ClientImpl::Unpublish(int32_t pubuid, ClientMessage&& msg) {
if (static_cast<uint32_t>(pubuid) >= m_publishers.size()) {
return;
}
m_publishers[index].reset();
m_publishers[pubuid].reset();

// loop over all publishers to update period
m_periodMs = kMaxPeriodMs;
Expand All @@ -216,40 +210,35 @@ void ClientImpl::Unpublish(NT_Publisher pubHandle, NT_Topic topicHandle,
}
UpdatePeriodic();

m_outgoing.SendMessage(pubHandle, std::move(msg));
m_outgoing.SendMessage(pubuid, std::move(msg));

// remove from outgoing handle map
m_outgoing.EraseHandle(pubHandle);
m_outgoing.EraseId(pubuid);
}

void ClientImpl::SetValue(NT_Publisher pubHandle, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={})", pubHandle, value.time(),
void ClientImpl::SetValue(int32_t pubuid, const Value& value) {
DEBUG4("SetValue({}, time={}, server_time={})", pubuid, value.time(),
value.server_time());
unsigned int index = Handle{pubHandle}.GetIndex();
if (index >= m_publishers.size() || !m_publishers[index]) {
if (static_cast<uint32_t>(pubuid) >= m_publishers.size() ||
!m_publishers[pubuid]) {
return;
}
auto& publisher = *m_publishers[index];
auto& publisher = *m_publishers[pubuid];
m_outgoing.SendValue(
pubHandle, value,
pubuid, value,
publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal);
}

void ClientImpl::ServerAnnounce(std::string_view name, int64_t id,
void ClientImpl::ServerAnnounce(std::string_view name, int id,
std::string_view typeStr,
const wpi::json& properties,
std::optional<int64_t> pubuid) {
std::optional<int> pubuid) {
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
assert(m_local);
NT_Publisher pubHandle{0};
if (pubuid) {
pubHandle = Handle(m_inst, pubuid.value(), Handle::kPublisher);
}
m_topicMap[id] =
m_local->NetworkAnnounce(name, typeStr, properties, pubHandle);
m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid);
}

void ClientImpl::ServerUnannounce(std::string_view name, int64_t id) {
void ClientImpl::ServerUnannounce(std::string_view name, int id) {
DEBUG4("ServerUnannounce({}, {})", name, id);
assert(m_local);
m_local->NetworkUnannounce(name);
Expand Down
Loading

0 comments on commit 257d3da

Please sign in to comment.