From 103080b6de5ca6b51d3c302a0bc8f2bc53861b79 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 19 Jan 2024 18:13:39 -0800 Subject: [PATCH] [wpinet] WebSocket: Fix serializer --- .../src/main/native/cpp/WebSocketSerializer.h | 17 +++++---- .../native/cpp/WebSocketSerializerTest.cpp | 38 +++++++++++++++++++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/wpinet/src/main/native/cpp/WebSocketSerializer.h b/wpinet/src/main/native/cpp/WebSocketSerializer.h index 264b8f594db..2e02c7c8a61 100644 --- a/wpinet/src/main/native/cpp/WebSocketSerializer.h +++ b/wpinet/src/main/native/cpp/WebSocketSerializer.h @@ -157,8 +157,8 @@ std::span TrySendFrames( SmallVector frameOffs; int numBytes = 0; while (frameIt != frameEnd) { - frameOffs.emplace_back(numBytes); numBytes += sendFrames.AddFrame(*frameIt++, server); + frameOffs.emplace_back(numBytes); if ((server && (numBytes >= 65536 || frameOffs.size() > 32)) || (!server && numBytes >= 8192)) { // don't waste too much memory or effort on header generation or masking @@ -200,15 +200,15 @@ std::span TrySendFrames( // figure out what the last (partially) frame sent actually was auto offIt = frameOffs.begin(); auto offEnd = frameOffs.end(); - bool isFin = true; while (offIt != offEnd && *offIt < sentBytes) { ++offIt; - isFin = (frameStart->opcode & WebSocket::kFlagFin) != 0; ++frameStart; } + bool isFin = (frameStart->opcode & WebSocket::kFlagFin) != 0; if (offIt != offEnd && *offIt == sentBytes && isFin) { // we finished at a normal FIN frame boundary; no need for a Write() + ++frameStart; SmallVector bufs; for (auto it = frames.begin(); it != frameStart; ++it) { bufs.append(it->data.begin(), it->data.end()); @@ -239,12 +239,13 @@ std::span TrySendFrames( } // continue through the last buffer of the last partial frame - while (bufIt != bufEnd && offIt != offEnd && pos < *offIt) { - pos += bufIt->len; - writeBufs.emplace_back(*bufIt++); - } if (offIt != offEnd) { + while (bufIt != bufEnd && pos < *offIt) { + pos += bufIt->len; + writeBufs.emplace_back(*bufIt++); + } ++offIt; + ++frameStart; } // move allocated buffers into request @@ -258,7 +259,7 @@ std::span TrySendFrames( while (frameStart != frameEnd && !isFin) { if (offIt != offEnd) { // we already generated the wire buffers for this frame, use them - while (pos < *offIt && bufIt != bufEnd) { + while (bufIt != bufEnd && pos < *offIt) { pos += bufIt->len; continuePos += bufIt->len; req->m_frames.m_bufs.emplace_back(*bufIt++); diff --git a/wpinet/src/test/native/cpp/WebSocketSerializerTest.cpp b/wpinet/src/test/native/cpp/WebSocketSerializerTest.cpp index 6767a23c685..a949b4d1d6a 100644 --- a/wpinet/src/test/native/cpp/WebSocketSerializerTest.cpp +++ b/wpinet/src/test/native/cpp/WebSocketSerializerTest.cpp @@ -376,4 +376,42 @@ TEST_F(WebSocketTrySendTest, ServerPartialLastFrame) { ASSERT_EQ(callbackCalled, 0); } +TEST_F(WebSocketTrySendTest, Big) { + std::vector bufs; + for (int i = 0; i < 100000;) { + i += 1430; + bufs.emplace_back( + uv::Buffer::Allocate(i < 100000 ? 1430 : (100000 - (i - 1430)))); + } + WebSocket::Frame frame{WebSocket::kOpBinary | WebSocket::kFlagFin, bufs}; + EXPECT_CALL(stream, TryWrite(_)).WillOnce(Return(7681)); + + // Write called for remainder of buffers + std::vector remBufs; + remBufs.emplace_back(bufs[5]); + remBufs.back().base += 521; + remBufs.back().len -= 521; + for (size_t i = 6; i < bufs.size(); ++i) { + remBufs.emplace_back(bufs[i]); + } + EXPECT_CALL(stream, DoWrite(wpi::SpanEq(remBufs), _)); + + ASSERT_TRUE( + TrySendFrames( + true, stream, {{frame}}, + [&](std::function, uv::Error)>&& cb) { + ++makeReqCalled; + req = std::make_shared(std::move(cb)); + return req; + }, + [&](auto bufs, auto err) { ++callbackCalled; }) + .empty()); + for (auto& buf : bufs) { + buf.Deallocate(); + } + ASSERT_EQ(makeReqCalled, 1); + ASSERT_TRUE(req->m_frames.m_bufs.empty()); + ASSERT_EQ(callbackCalled, 0); +} + } // namespace wpi::detail