Skip to content

Commit

Permalink
Fix control sends
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterJohnson committed Oct 28, 2023
1 parent 072020f commit 45d55c8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
26 changes: 15 additions & 11 deletions wpinet/src/main/native/cpp/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class WebSocket::WriteReq : public uv::WriteReq,
void Send(uv::Error err) {
auto ws = m_ws.lock();
if (!ws || err) {
WS_DEBUG("no WS or error, calling callback\n");
m_frames.ReleaseBufs();
m_callback(m_userBufs, err);
return;
Expand All @@ -71,8 +72,9 @@ class WebSocket::WriteReq : public uv::WriteReq,
// We have a control frame; switch to it. We will come back here via
// the control frame's m_cont when it's done.
WS_DEBUG("Continuing with a control write\n");
ws->m_curWriteReq = m_controlCont;
return m_controlCont->Send({});
auto controlCont = std::move(m_controlCont);
m_controlCont.reset();
return controlCont->Send({});
}
int result = Continue(ws->m_stream, shared_from_this());
WS_DEBUG("Continue() -> {}\n", result);
Expand Down Expand Up @@ -813,21 +815,23 @@ void WebSocket::SendControl(
// There's a write request in flight, but since this is a control frame, we
// want to send it as soon as we can, without waiting for all frames in that
// request (or any continuations) to be sent.
auto req = std::make_shared<WriteReq>(std::weak_ptr<WebSocket>{},
std::move(callback));
auto req = std::make_shared<WriteReq>(weak_from_this(), std::move(callback));
VerboseDebug(frame);
req->m_frames.AddFrame(frame, m_server);
size_t numBytes = req->m_frames.AddFrame(frame, m_server);
req->m_userBufs.append(frame.data.begin(), frame.data.end());
req->m_continueBufPos = req->m_frames.m_bufs.size();
req->m_continueFrameOffs.emplace_back(numBytes);
req->m_cont = curReq;
// There may be multiple control packets in flight; maintain in-order
// transmission. Linear search here is O(n^2), but should be pretty rare.
while (curReq->m_controlCont) {
if (!curReq->m_controlCont) {
curReq->m_controlCont = std::move(req);
} else {
curReq = curReq->m_controlCont;
while (curReq->m_cont != req->m_cont) {
curReq = curReq->m_cont;
}
curReq->m_cont = std::move(req);
}
// Insert ahead of any further continuation
req->m_cont = std::move(curReq->m_cont);
curReq->m_cont = nullptr;
curReq->m_controlCont = std::move(req);
}

void WebSocket::SendError(
Expand Down
18 changes: 15 additions & 3 deletions wpinet/src/test/native/cpp/WebSocketIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "wpinet/WebSocketServer.h" // NOLINT(build/include_order)

#include <fmt/format.h>
#include <wpi/SmallString.h>

#include "WebSocketTest.h"
Expand Down Expand Up @@ -150,18 +151,24 @@ TEST_F(WebSocketIntegrationTest, ClientSendText) {
TEST_F(WebSocketIntegrationTest, ServerSendPing) {
int gotPing = 0;
int gotPong = 0;
int gotData = 0;

serverPipe->Listen([&]() {
auto conn = serverPipe->Accept();
auto server = WebSocketServer::Create(*conn);
server->connected.connect([&](std::string_view, WebSocket& ws) {
ws.SendText({{"hello"}}, [&](auto, uv::Error) {});
ws.SendPing({uv::Buffer{"\x03\x04", 2}}, [&](auto, uv::Error) {});
ws.SendPing({uv::Buffer{"\x03\x04", 2}}, [&](auto, uv::Error) {});
ws.SendText({{"hello"}}, [&](auto, uv::Error) {});
ws.pong.connect([&](auto data) {
++gotPong;
std::vector<uint8_t> recvData{data.begin(), data.end()};
std::vector<uint8_t> expectData{0x03, 0x04};
ASSERT_EQ(recvData, expectData);
ws.Close();
if (gotPong == 2) {
ws.Close();
}
});
});
});
Expand All @@ -180,12 +187,17 @@ TEST_F(WebSocketIntegrationTest, ServerSendPing) {
std::vector<uint8_t> expectData{0x03, 0x04};
ASSERT_EQ(recvData, expectData);
});
ws->text.connect([&](std::string_view data, bool) {
++gotData;
ASSERT_EQ(data, "hello");
});
});

loop->Run();

ASSERT_EQ(gotPing, 1);
ASSERT_EQ(gotPong, 1);
ASSERT_EQ(gotPing, 2);
ASSERT_EQ(gotPong, 2);
ASSERT_EQ(gotData, 2);
}

} // namespace wpi

0 comments on commit 45d55c8

Please sign in to comment.